Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
- replorigin = (replorigin_session_origin != InvalidReplOriginId &&
- replorigin_session_origin != DoNotReplicateId);
+ replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+ replorigin_xact_state.origin != DoNotReplicateId);
if (replorigin)
{
- hdr->origin_lsn = replorigin_session_origin_lsn;
- hdr->origin_timestamp = replorigin_session_origin_timestamp;
+ hdr->origin_lsn = replorigin_xact_state.origin_lsn;
+ hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
}
/*
if (replorigin)
{
/* Move LSNs forward for this replication origin */
- replorigin_session_advance(replorigin_session_origin_lsn,
+ replorigin_session_advance(replorigin_xact_state.origin_lsn,
gxact->prepare_end_lsn);
}
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidReplOriginId &&
- replorigin_session_origin != DoNotReplicateId);
+ replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+ replorigin_xact_state.origin != DoNotReplicateId);
/* Load the injection point before entering the critical section */
INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
if (replorigin)
/* Move LSNs forward for this replication origin */
- replorigin_session_advance(replorigin_session_origin_lsn,
+ replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/*
* Record commit timestamp. The value comes from plain commit timestamp
* if replorigin is not enabled, or replorigin already set a value for us
- * in replorigin_session_origin_timestamp otherwise.
+ * in replorigin_xact_state.origin_timestamp otherwise.
*
* We don't need to WAL-log anything here, as the commit record written
* above already contains the data.
*/
- if (!replorigin || replorigin_session_origin_timestamp == 0)
- replorigin_session_origin_timestamp = committs;
+ if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+ replorigin_xact_state.origin_timestamp = committs;
TransactionTreeSetCommitTsData(xid, nchildren, children,
- replorigin_session_origin_timestamp,
- replorigin_session_origin);
+ replorigin_xact_state.origin_timestamp,
+ replorigin_xact_state.origin);
/*
* We don't currently try to sleep before flush here ... nor is there any
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidReplOriginId &&
- replorigin_session_origin != DoNotReplicateId);
+ replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+ replorigin_xact_state.origin != DoNotReplicateId);
/*
* Catch the scenario where we aborted partway through
if (replorigin)
/* Move LSNs forward for this replication origin */
- replorigin_session_advance(replorigin_session_origin_lsn,
+ replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/* Always flush, since we're about to remove the 2PC state file */
* Are we using the replication origins feature? Or, in other words,
* are we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidReplOriginId &&
- replorigin_session_origin != DoNotReplicateId);
+ replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+ replorigin_xact_state.origin != DoNotReplicateId);
/*
* Mark ourselves as within our "commit critical section". This
if (replorigin)
/* Move LSNs forward for this replication origin */
- replorigin_session_advance(replorigin_session_origin_lsn,
+ replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/*
* Record commit timestamp. The value comes from plain commit
* timestamp if there's no replication origin; otherwise, the
- * timestamp was already set in replorigin_session_origin_timestamp by
- * replication.
+ * timestamp was already set in replorigin_xact_state.origin_timestamp
+ * by replication.
*
* We don't need to WAL-log anything here, as the commit record
* written above already contains the data.
*/
- if (!replorigin || replorigin_session_origin_timestamp == 0)
- replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp();
+ if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
+ replorigin_xact_state.origin_timestamp = GetCurrentTransactionStopTimestamp();
TransactionTreeSetCommitTsData(xid, nchildren, children,
- replorigin_session_origin_timestamp,
- replorigin_session_origin);
+ replorigin_xact_state.origin_timestamp,
+ replorigin_xact_state.origin);
}
/*
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidReplOriginId &&
- replorigin_session_origin != DoNotReplicateId);
+ replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
+ replorigin_xact_state.origin != DoNotReplicateId);
/* Fetch the data we need for the abort record */
nrels = smgrGetPendingDeletes(false, &rels);
if (replorigin)
/* Move LSNs forward for this replication origin */
- replorigin_session_advance(replorigin_session_origin_lsn,
+ replorigin_session_advance(replorigin_xact_state.origin_lsn,
XactLastRecEnd);
/*
}
/* dump transaction origin information */
- if (replorigin_session_origin != InvalidReplOriginId)
+ if (replorigin_xact_state.origin != InvalidReplOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
- xl_origin.origin_lsn = replorigin_session_origin_lsn;
- xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+ xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+ xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
}
if (xl_xinfo.xinfo != 0)
* Dump transaction origin information. We need this during recovery to
* update the replication origin progress.
*/
- if (replorigin_session_origin != InvalidReplOriginId)
+ if (replorigin_xact_state.origin != InvalidReplOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
- xl_origin.origin_lsn = replorigin_session_origin_lsn;
- xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+ xl_origin.origin_lsn = replorigin_xact_state.origin_lsn;
+ xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp;
}
if (xl_xinfo.xinfo != 0)
/* followed by the record's origin, if any */
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
- replorigin_session_origin != InvalidReplOriginId)
+ replorigin_xact_state.origin != InvalidReplOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
- memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
- scratch += sizeof(replorigin_session_origin);
+ memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin));
+ scratch += sizeof(replorigin_xact_state.origin);
}
/* followed by toplevel XID, if not already included in previous record */
* origin which was already acquired by its leader process.
*/
replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
- replorigin_session_origin = originid;
+ replorigin_xact_state.origin = originid;
CommitTransactionCommand();
/*
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
- replorigin_session_origin_lsn = abort_data->abort_lsn;
- replorigin_session_origin_timestamp = abort_data->abort_time;
+ replorigin_xact_state.origin_lsn = abort_data->abort_lsn;
+ replorigin_xact_state.origin_timestamp = abort_data->abort_time;
/*
* If the two XIDs are the same, it's in fact abort of toplevel xact, so
ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
} ReplicationStateCtl;
-/* external variables */
-ReplOriginId replorigin_session_origin = InvalidReplOriginId; /* assumed identity */
-XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
-TimestampTz replorigin_session_origin_timestamp = 0;
+/* Global variable for per-transaction replication origin state */
+ReplOriginXactState replorigin_xact_state = {
+ .origin = InvalidReplOriginId, /* assumed identity */
+ .origin_lsn = InvalidXLogRecPtr,
+ .origin_timestamp = 0
+};
/*
* Base address into a shared memory array of replication states of size
* Tell the replication origin progress machinery that a commit from 'node'
* that originated at the LSN remote_commit on the remote node was replayed
* successfully and that we don't need to do so again. In combination with
- * setting up replorigin_session_origin_lsn and replorigin_session_origin
+ * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
* that ensures we won't lose knowledge about that after a crash if the
* transaction had a persistent effect (think of asynchronous commits).
*
void
replorigin_xact_clear(bool clear_origin)
{
- replorigin_session_origin_lsn = InvalidXLogRecPtr;
- replorigin_session_origin_timestamp = 0;
+ replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
+ replorigin_xact_state.origin_timestamp = 0;
if (clear_origin)
- replorigin_session_origin = InvalidReplOriginId;
+ replorigin_xact_state.origin = InvalidReplOriginId;
}
pid = PG_GETARG_INT32(1);
replorigin_session_setup(origin, pid);
- replorigin_session_origin = origin;
+ replorigin_xact_state.origin = origin;
pfree(name);
{
replorigin_check_prerequisites(false, false);
- PG_RETURN_BOOL(replorigin_session_origin != InvalidReplOriginId);
+ PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
}
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
- replorigin_session_origin_lsn = location;
- replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
+ replorigin_xact_state.origin_lsn = location;
+ replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
PG_RETURN_VOID();
}
*/
originid = replorigin_by_name(originname, false);
replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
+ replorigin_xact_state.origin = originid;
*origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
+ replorigin_xact_state.origin = originid;
/*
* If the user did not opt to run as the owner of the subscription
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
- replorigin_session_origin_lsn = prepare_data->end_lsn;
- replorigin_session_origin_timestamp = prepare_data->prepare_time;
+ replorigin_xact_state.origin_lsn = prepare_data->end_lsn;
+ replorigin_xact_state.origin_timestamp = prepare_data->prepare_time;
PrepareTransactionBlock(gid);
}
* Update origin state so we can restart streaming from correct position
* in case of crash.
*/
- replorigin_session_origin_lsn = prepare_data.end_lsn;
- replorigin_session_origin_timestamp = prepare_data.commit_time;
+ replorigin_xact_state.origin_lsn = prepare_data.end_lsn;
+ replorigin_xact_state.origin_timestamp = prepare_data.commit_time;
FinishPreparedTransaction(gid, true);
end_replication_step();
* Update origin state so we can restart streaming from correct
* position in case of crash.
*/
- replorigin_session_origin_lsn = rollback_data.rollback_end_lsn;
- replorigin_session_origin_timestamp = rollback_data.rollback_time;
+ replorigin_xact_state.origin_lsn = rollback_data.rollback_end_lsn;
+ replorigin_xact_state.origin_timestamp = rollback_data.rollback_time;
/* There is no transaction when ABORT/ROLLBACK PREPARED is called */
begin_replication_step();
* Update origin state so we can restart streaming from correct
* position in case of crash.
*/
- replorigin_session_origin_lsn = commit_data->end_lsn;
- replorigin_session_origin_timestamp = commit_data->committime;
+ replorigin_xact_state.origin_lsn = commit_data->end_lsn;
+ replorigin_xact_state.origin_timestamp = commit_data->committime;
CommitTransactionCommand();
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
- conflicttuple.origin != replorigin_session_origin)
+ conflicttuple.origin != replorigin_xact_state.origin)
{
TupleTableSlot *newslot;
&conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
- conflicttuple.origin != replorigin_session_origin)
+ conflicttuple.origin != replorigin_xact_state.origin)
type = CT_UPDATE_DELETED;
else
type = CT_UPDATE_MISSING;
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
- conflicttuple.origin != replorigin_session_origin)
+ conflicttuple.origin != replorigin_xact_state.origin)
{
conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
&conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
- conflicttuple.origin != replorigin_session_origin)
+ conflicttuple.origin != replorigin_xact_state.origin)
type = CT_UPDATE_DELETED;
else
type = CT_UPDATE_MISSING;
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
- conflicttuple.origin != replorigin_session_origin)
+ conflicttuple.origin != replorigin_xact_state.origin)
{
TupleTableSlot *newslot;
if (!OidIsValid(originid))
originid = replorigin_create(originname);
replorigin_session_setup(originid, 0);
- replorigin_session_origin = originid;
+ replorigin_xact_state.origin = originid;
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
*/
#define MAX_RONAME_LEN 512
-extern PGDLLIMPORT ReplOriginId replorigin_session_origin;
-extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
-extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
+typedef struct ReplOriginXactState
+{
+ ReplOriginId origin;
+ XLogRecPtr origin_lsn;
+ TimestampTz origin_timestamp;
+} ReplOriginXactState;
+
+extern PGDLLIMPORT ReplOriginXactState replorigin_xact_state;
/* GUCs */
extern PGDLLIMPORT int max_active_replication_origins;
ReorderTuple
ReparameterizeForeignPathByChild_function
ReplOriginId
+ReplOriginXactState
ReplaceVarsFromTargetList_context
ReplaceVarsNoMatchOption
ReplaceWrapOption