]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Consolidate replication origin session globals into a single struct.
authorMasahiko Sawada <msawada@postgresql.org>
Wed, 28 Jan 2026 20:26:22 +0000 (12:26 -0800)
committerMasahiko Sawada <msawada@postgresql.org>
Wed, 28 Jan 2026 20:26:22 +0000 (12:26 -0800)
This commit moves the separate global variables for replication origin
state into a single ReplOriginXactState struct. This groups logically
related variables, which improves code readability and simplifies
state management (e.g., resetting the state) by handling them as a
unit.

Author: Chao Li <lic@highgo.com>
Suggested-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com

src/backend/access/transam/twophase.c
src/backend/access/transam/xact.c
src/backend/access/transam/xloginsert.c
src/backend/replication/logical/applyparallelworker.c
src/backend/replication/logical/origin.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/origin.h
src/tools/pgindent/typedefs.list

index 71dd3a9a7efb335469d2a6f5436f3ce4e66ff72f..601ce3faa64b3475f892e21154ace11926140051 100644 (file)
@@ -1157,13 +1157,13 @@ EndPrepare(GlobalTransaction gxact)
        Assert(hdr->magic == TWOPHASE_MAGIC);
        hdr->total_len = records.total_len + sizeof(pg_crc32c);
 
-       replorigin = (replorigin_session_origin != InvalidReplOriginId &&
-                                 replorigin_session_origin != DoNotReplicateId);
+       replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+                                 replorigin_xact_state.origin != DoNotReplicateId);
 
        if (replorigin)
        {
-               hdr->origin_lsn = replorigin_session_origin_lsn;
-               hdr->origin_timestamp = replorigin_session_origin_timestamp;
+               hdr->origin_lsn = replorigin_xact_state.origin_lsn;
+               hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
        }
 
        /*
@@ -1211,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact)
        if (replorigin)
        {
                /* Move LSNs forward for this replication origin */
-               replorigin_session_advance(replorigin_session_origin_lsn,
+               replorigin_session_advance(replorigin_xact_state.origin_lsn,
                                                                   gxact->prepare_end_lsn);
        }
 
@@ -2330,8 +2330,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
         * Are we using the replication origins feature?  Or, in other words, are
         * we replaying remote actions?
         */
-       replorigin = (replorigin_session_origin != InvalidReplOriginId &&
-                                 replorigin_session_origin != DoNotReplicateId);
+       replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+                                 replorigin_xact_state.origin != DoNotReplicateId);
 
        /* Load the injection point before entering the critical section */
        INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
@@ -2376,23 +2376,23 @@ RecordTransactionCommitPrepared(TransactionId xid,
 
        if (replorigin)
                /* Move LSNs forward for this replication origin */
-               replorigin_session_advance(replorigin_session_origin_lsn,
+               replorigin_session_advance(replorigin_xact_state.origin_lsn,
                                                                   XactLastRecEnd);
 
        /*
         * Record commit timestamp.  The value comes from plain commit timestamp
         * if replorigin is not enabled, or replorigin already set a value for us
-        * in replorigin_session_origin_timestamp otherwise.
+        * in replorigin_xact_state.origin_timestamp otherwise.
         *
         * We don't need to WAL-log anything here, as the commit record written
         * above already contains the data.
         */
-       if (!replorigin || replorigin_session_origin_timestamp == 0)
-               replorigin_session_origin_timestamp = committs;
+       if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+               replorigin_xact_state.origin_timestamp = committs;
 
        TransactionTreeSetCommitTsData(xid, nchildren, children,
-                                                                  replorigin_session_origin_timestamp,
-                                                                  replorigin_session_origin);
+                                                                  replorigin_xact_state.origin_timestamp,
+                                                                  replorigin_xact_state.origin);
 
        /*
         * We don't currently try to sleep before flush here ... nor is there any
@@ -2445,8 +2445,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
         * Are we using the replication origins feature?  Or, in other words, are
         * we replaying remote actions?
         */
-       replorigin = (replorigin_session_origin != InvalidReplOriginId &&
-                                 replorigin_session_origin != DoNotReplicateId);
+       replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+                                 replorigin_xact_state.origin != DoNotReplicateId);
 
        /*
         * Catch the scenario where we aborted partway through
@@ -2472,7 +2472,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
        if (replorigin)
                /* Move LSNs forward for this replication origin */
-               replorigin_session_advance(replorigin_session_origin_lsn,
+               replorigin_session_advance(replorigin_xact_state.origin_lsn,
                                                                   XactLastRecEnd);
 
        /* Always flush, since we're about to remove the 2PC state file */
index 12a1505b53941feb6368705c9a1dbc8c51d1a434..eba4f063168a57d12ecb59c06e763f35d0b984e5 100644 (file)
@@ -1413,8 +1413,8 @@ RecordTransactionCommit(void)
                 * Are we using the replication origins feature?  Or, in other words,
                 * are we replaying remote actions?
                 */
-               replorigin = (replorigin_session_origin != InvalidReplOriginId &&
-                                         replorigin_session_origin != DoNotReplicateId);
+               replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+                                         replorigin_xact_state.origin != DoNotReplicateId);
 
                /*
                 * Mark ourselves as within our "commit critical section".  This
@@ -1462,25 +1462,25 @@ RecordTransactionCommit(void)
 
                if (replorigin)
                        /* Move LSNs forward for this replication origin */
-                       replorigin_session_advance(replorigin_session_origin_lsn,
+                       replorigin_session_advance(replorigin_xact_state.origin_lsn,
                                                                           XactLastRecEnd);
 
                /*
                 * Record commit timestamp.  The value comes from plain commit
                 * timestamp if there's no replication origin; otherwise, the
-                * timestamp was already set in replorigin_session_origin_timestamp by
-                * replication.
+                * timestamp was already set in replorigin_xact_state.origin_timestamp
+                * by replication.
                 *
                 * We don't need to WAL-log anything here, as the commit record
                 * written above already contains the data.
                 */
 
-               if (!replorigin || replorigin_session_origin_timestamp == 0)
-                       replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp();
+               if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+                       replorigin_xact_state.origin_timestamp = GetCurrentTransactionStopTimestamp();
 
                TransactionTreeSetCommitTsData(xid, nchildren, children,
-                                                                          replorigin_session_origin_timestamp,
-                                                                          replorigin_session_origin);
+                                                                          replorigin_xact_state.origin_timestamp,
+                                                                          replorigin_xact_state.origin);
        }
 
        /*
@@ -1810,8 +1810,8 @@ RecordTransactionAbort(bool isSubXact)
         * Are we using the replication origins feature?  Or, in other words, are
         * we replaying remote actions?
         */
-       replorigin = (replorigin_session_origin != InvalidReplOriginId &&
-                                 replorigin_session_origin != DoNotReplicateId);
+       replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+                                 replorigin_xact_state.origin != DoNotReplicateId);
 
        /* Fetch the data we need for the abort record */
        nrels = smgrGetPendingDeletes(false, &rels);
@@ -1838,7 +1838,7 @@ RecordTransactionAbort(bool isSubXact)
 
        if (replorigin)
                /* Move LSNs forward for this replication origin */
-               replorigin_session_advance(replorigin_session_origin_lsn,
+               replorigin_session_advance(replorigin_xact_state.origin_lsn,
                                                                   XactLastRecEnd);
 
        /*
@@ -5928,12 +5928,12 @@ XactLogCommitRecord(TimestampTz commit_time,
        }
 
        /* dump transaction origin information */
-       if (replorigin_session_origin != InvalidReplOriginId)
+       if (replorigin_xact_state.origin != InvalidReplOriginId)
        {
                xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-               xl_origin.origin_lsn = replorigin_session_origin_lsn;
-               xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+               xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+               xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
        }
 
        if (xl_xinfo.xinfo != 0)
@@ -6081,12 +6081,12 @@ XactLogAbortRecord(TimestampTz abort_time,
         * Dump transaction origin information. We need this during recovery to
         * update the replication origin progress.
         */
-       if (replorigin_session_origin != InvalidReplOriginId)
+       if (replorigin_xact_state.origin != InvalidReplOriginId)
        {
                xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-               xl_origin.origin_lsn = replorigin_session_origin_lsn;
-               xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+               xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+               xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
        }
 
        if (xl_xinfo.xinfo != 0)
index e2db6adc382e107577ef59da771c300164f966e4..d3acaa636c3ed448445e85e6c998a33ee64200e8 100644 (file)
@@ -861,11 +861,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
        /* followed by the record's origin, if any */
        if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
-               replorigin_session_origin != InvalidReplOriginId)
+               replorigin_xact_state.origin != InvalidReplOriginId)
        {
                *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
-               memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
-               scratch += sizeof(replorigin_session_origin);
+               memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin));
+               scratch += sizeof(replorigin_xact_state.origin);
        }
 
        /* followed by toplevel XID, if not already included in previous record */
index 2f5d74180dcc617453100a7a074abd652a805b60..8a01f16a2ca880acc49bc05c48881e672b4abefb 100644 (file)
@@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg)
         * origin which was already acquired by its leader process.
         */
        replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
-       replorigin_session_origin = originid;
+       replorigin_xact_state.origin = originid;
        CommitTransactionCommand();
 
        /*
@@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data)
         * Update origin state so we can restart streaming from correct position
         * in case of crash.
         */
-       replorigin_session_origin_lsn = abort_data->abort_lsn;
-       replorigin_session_origin_timestamp = abort_data->abort_time;
+       replorigin_xact_state.origin_lsn = abort_data->abort_lsn;
+       replorigin_xact_state.origin_timestamp = abort_data->abort_time;
 
        /*
         * If the two XIDs are the same, it's in fact abort of toplevel xact, so
index 7704cc5ff1b4cf5a8f694304d18c15d0eabc4ff0..c3271a6fd0ec3757d1c01ef2a99ba3ba9b17997e 100644 (file)
@@ -162,10 +162,12 @@ typedef struct ReplicationStateCtl
        ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
 } ReplicationStateCtl;
 
-/* external variables */
-ReplOriginId replorigin_session_origin = InvalidReplOriginId;  /* assumed identity */
-XLogRecPtr     replorigin_session_origin_lsn = InvalidXLogRecPtr;
-TimestampTz replorigin_session_origin_timestamp = 0;
+/* Global variable for per-transaction replication origin state */
+ReplOriginXactState replorigin_xact_state = {
+       .origin = InvalidReplOriginId,  /* assumed identity */
+       .origin_lsn = InvalidXLogRecPtr,
+       .origin_timestamp = 0
+};
 
 /*
  * Base address into a shared memory array of replication states of size
@@ -902,7 +904,7 @@ replorigin_redo(XLogReaderState *record)
  * Tell the replication origin progress machinery that a commit from 'node'
  * that originated at the LSN remote_commit on the remote node was replayed
  * successfully and that we don't need to do so again. In combination with
- * setting up replorigin_session_origin_lsn and replorigin_session_origin
+ * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
  * that ensures we won't lose knowledge about that after a crash if the
  * transaction had a persistent effect (think of asynchronous commits).
  *
@@ -1349,10 +1351,10 @@ replorigin_session_get_progress(bool flush)
 void
 replorigin_xact_clear(bool clear_origin)
 {
-       replorigin_session_origin_lsn = InvalidXLogRecPtr;
-       replorigin_session_origin_timestamp = 0;
+       replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
+       replorigin_xact_state.origin_timestamp = 0;
        if (clear_origin)
-               replorigin_session_origin = InvalidReplOriginId;
+               replorigin_xact_state.origin = InvalidReplOriginId;
 }
 
 
@@ -1462,7 +1464,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
        pid = PG_GETARG_INT32(1);
        replorigin_session_setup(origin, pid);
 
-       replorigin_session_origin = origin;
+       replorigin_xact_state.origin = origin;
 
        pfree(name);
 
@@ -1492,7 +1494,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
 {
        replorigin_check_prerequisites(false, false);
 
-       PG_RETURN_BOOL(replorigin_session_origin != InvalidReplOriginId);
+       PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
 }
 
 
@@ -1536,8 +1538,8 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("no replication origin is configured")));
 
-       replorigin_session_origin_lsn = location;
-       replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+       replorigin_xact_state.origin_lsn = location;
+       replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
 
        PG_RETURN_VOID();
 }
index 15a59759645ad1fcbb970674e3ca5bfcdefeac32..19a3c21a86372b6c2cd40b7a5a2510f7a4e15270 100644 (file)
@@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                 */
                originid = replorigin_by_name(originname, false);
                replorigin_session_setup(originid, 0);
-               replorigin_session_origin = originid;
+               replorigin_xact_state.origin = originid;
                *origin_startpos = replorigin_session_get_progress(false);
 
                CommitTransactionCommand();
@@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
        UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
 
        replorigin_session_setup(originid, 0);
-       replorigin_session_origin = originid;
+       replorigin_xact_state.origin = originid;
 
        /*
         * If the user did not opt to run as the owner of the subscription
index 23996a833531d79c1ebffef8a92517887f14ebba..32725c48623b30ed2073d237e5ee4448b67864a6 100644 (file)
@@ -1318,8 +1318,8 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
         * Update origin state so we can restart streaming from correct position
         * in case of crash.
         */
-       replorigin_session_origin_lsn = prepare_data->end_lsn;
-       replorigin_session_origin_timestamp = prepare_data->prepare_time;
+       replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
+       replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
 
        PrepareTransactionBlock(gid);
 }
@@ -1421,8 +1421,8 @@ apply_handle_commit_prepared(StringInfo s)
         * Update origin state so we can restart streaming from correct position
         * in case of crash.
         */
-       replorigin_session_origin_lsn = prepare_data.end_lsn;
-       replorigin_session_origin_timestamp = prepare_data.commit_time;
+       replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
+       replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
 
        FinishPreparedTransaction(gid, true);
        end_replication_step();
@@ -1479,8 +1479,8 @@ apply_handle_rollback_prepared(StringInfo s)
                 * Update origin state so we can restart streaming from correct
                 * position in case of crash.
                 */
-               replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
-               replorigin_session_origin_timestamp = rollback_data.rollback_time;
+               replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
+               replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
 
                /* There is no transaction when ABORT/ROLLBACK PREPARED is called */
                begin_replication_step();
@@ -2526,8 +2526,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data)
                 * Update origin state so we can restart streaming from correct
                 * position in case of crash.
                 */
-               replorigin_session_origin_lsn = commit_data->end_lsn;
-               replorigin_session_origin_timestamp = commit_data->committime;
+               replorigin_xact_state.origin_lsn = commit_data->end_lsn;
+               replorigin_xact_state.origin_timestamp = commit_data->committime;
 
                CommitTransactionCommand();
 
@@ -2940,7 +2940,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                 */
                if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
                                                                        &conflicttuple.origin, &conflicttuple.ts) &&
-                       conflicttuple.origin != replorigin_session_origin)
+                       conflicttuple.origin != replorigin_xact_state.origin)
                {
                        TupleTableSlot *newslot;
 
@@ -2982,7 +2982,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
                                                                           &conflicttuple.xmin,
                                                                           &conflicttuple.origin,
                                                                           &conflicttuple.ts) &&
-                       conflicttuple.origin != replorigin_session_origin)
+                       conflicttuple.origin != replorigin_xact_state.origin)
                        type = CT_UPDATE_DELETED;
                else
                        type = CT_UPDATE_MISSING;
@@ -3135,7 +3135,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
                 */
                if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
                                                                        &conflicttuple.origin, &conflicttuple.ts) &&
-                       conflicttuple.origin != replorigin_session_origin)
+                       conflicttuple.origin != replorigin_xact_state.origin)
                {
                        conflicttuple.slot = localslot;
                        ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3477,7 +3477,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                                                                                   &conflicttuple.xmin,
                                                                                                   &conflicttuple.origin,
                                                                                                   &conflicttuple.ts) &&
-                                               conflicttuple.origin != replorigin_session_origin)
+                                               conflicttuple.origin != replorigin_xact_state.origin)
                                                type = CT_UPDATE_DELETED;
                                        else
                                                type = CT_UPDATE_MISSING;
@@ -3503,7 +3503,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
                                                                                        &conflicttuple.origin,
                                                                                        &conflicttuple.ts) &&
-                                       conflicttuple.origin != replorigin_session_origin)
+                                       conflicttuple.origin != replorigin_xact_state.origin)
                                {
                                        TupleTableSlot *newslot;
 
@@ -5652,7 +5652,7 @@ run_apply_worker(void)
        if (!OidIsValid(originid))
                originid = replorigin_create(originname);
        replorigin_session_setup(originid, 0);
-       replorigin_session_origin = originid;
+       replorigin_xact_state.origin = originid;
        origin_startpos = replorigin_session_get_progress(false);
        CommitTransactionCommand();
 
index bc0a16ecc7e6c8e0b5d3b0475d5a01d2402e0352..eb46b41b4b704e13fa5d0dbe4d46155de9f61fec 100644 (file)
@@ -40,9 +40,14 @@ typedef struct xl_replorigin_drop
  */
 #define MAX_RONAME_LEN 512
 
-extern PGDLLIMPORT ReplOriginId replorigin_session_origin;
-extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
-extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+typedef struct ReplOriginXactState
+{
+       ReplOriginId origin;
+       XLogRecPtr      origin_lsn;
+       TimestampTz origin_timestamp;
+} ReplOriginXactState;
+
+extern PGDLLIMPORT ReplOriginXactState replorigin_xact_state;
 
 /* GUCs */
 extern PGDLLIMPORT int max_active_replication_origins;
index 7ad16c8ad2332c550a232d06d47c73b8069c4068..34374df0d671935add8f7d9e917b63a32838b931 100644 (file)
@@ -2577,6 +2577,7 @@ ReorderBufferUpdateProgressTxnCB
 ReorderTuple
 ReparameterizeForeignPathByChild_function
 ReplOriginId
+ReplOriginXactState
 ReplaceVarsFromTargetList_context
 ReplaceVarsNoMatchOption
 ReplaceWrapOption