int nrelations, Relation relations[],
ReorderBufferChange *change);
static bool pg_decode_filter(LogicalDecodingContext *ctx,
- RepOriginId origin_id);
+ ReplOriginId origin_id);
static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr lsn,
bool transactional, const char *prefix,
static bool
pg_decode_filter(LogicalDecodingContext *ctx,
- RepOriginId origin_id)
+ ReplOriginId origin_id)
{
TestDecodingData *data = ctx->output_plugin_private;
- if (data->only_local && origin_id != InvalidRepOriginId)
+ if (data->only_local && origin_id != InvalidReplOriginId)
return true;
return false;
}
output plugin.
<programlisting>
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
- RepOriginId origin_id);
+ ReplOriginId origin_id);
</programlisting>
The <parameter>ctx</parameter> parameter has the same contents
as for the other callbacks. No information but the origin is
}
static void
-xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId origin_id)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, ReplOriginId origin_id)
{
xl_xact_parsed_commit parsed;
}
static void
-xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, RepOriginId origin_id)
+xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec, ReplOriginId origin_id)
{
xl_xact_parsed_abort parsed;
}
static void
-xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, RepOriginId origin_id)
+xact_desc_prepare(StringInfo buf, uint8 info, xl_xact_prepare *xlrec, ReplOriginId origin_id)
{
xl_xact_parsed_prepare parsed;
* Check if the replication origin has been set in this record in the same
* way as PrepareRedoAdd().
*/
- if (origin_id != InvalidRepOriginId)
+ if (origin_id != InvalidReplOriginId)
appendStringInfo(buf, "; origin: node %u, lsn %X/%08X, at %s",
origin_id,
LSN_FORMAT_ARGS(parsed.origin_lsn),
typedef struct CommitTimestampEntry
{
TimestampTz time;
- RepOriginId nodeid;
+ ReplOriginId nodeid;
} CommitTimestampEntry;
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
- sizeof(RepOriginId))
+ sizeof(ReplOriginId))
#define COMMIT_TS_XACTS_PER_PAGE \
(BLCKSZ / SizeOfCommitTimestampEntry)
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
- RepOriginId nodeid, int64 pageno);
+ ReplOriginId nodeid, int64 pageno);
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
- RepOriginId nodeid, int slotno);
+ ReplOriginId nodeid, int slotno);
static void error_commit_ts_disabled(void);
static bool CommitTsPagePrecedes(int64 page1, int64 page2);
static void ActivateCommitTs(void);
void
TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- RepOriginId nodeid)
+ ReplOriginId nodeid)
{
int i;
TransactionId headxid;
static void
SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
- RepOriginId nodeid, int64 pageno)
+ ReplOriginId nodeid, int64 pageno)
{
LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
int slotno;
*/
static void
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
- RepOriginId nodeid, int slotno)
+ ReplOriginId nodeid, int slotno)
{
int entryno = TransactionIdToCTsEntry(xid);
CommitTimestampEntry entry;
*/
bool
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
- RepOriginId *nodeid)
+ ReplOriginId *nodeid)
{
int64 pageno = TransactionIdToCTsPage(xid);
int entryno = TransactionIdToCTsEntry(xid);
{
*ts = 0;
if (nodeid)
- *nodeid = InvalidRepOriginId;
+ *nodeid = InvalidReplOriginId;
return false;
}
* as NULL if not wanted.
*/
TransactionId
-GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
+GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
{
TransactionId xid;
pg_last_committed_xact(PG_FUNCTION_ARGS)
{
TransactionId xid;
- RepOriginId nodeid;
+ ReplOriginId nodeid;
TimestampTz ts;
Datum values[3];
bool nulls[3];
pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
{
TransactionId xid = PG_GETARG_TRANSACTIONID(0);
- RepOriginId nodeid;
+ ReplOriginId nodeid;
TimestampTz ts;
Datum values[2];
bool nulls[2];
commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
- commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+ commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
commitTsShared->commitTsActive = false;
}
else
commitTsShared->commitTsActive = false;
commitTsShared->xidLastCommit = InvalidTransactionId;
TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
- commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
+ commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
TransamVariables->oldestCommitTsXid = InvalidTransactionId;
TransamVariables->newestCommitTsXid = InvalidTransactionId;
Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
- replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
if (replorigin)
continue;
PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
- InvalidXLogRecPtr, InvalidRepOriginId);
+ InvalidXLogRecPtr, InvalidReplOriginId);
}
}
LWLockRelease(TwoPhaseStateLock);
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
/* Load the injection point before entering the critical section */
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
/*
void
PrepareRedoAdd(FullTransactionId fxid, char *buf,
XLogRecPtr start_lsn, XLogRecPtr end_lsn,
- RepOriginId origin_id)
+ ReplOriginId origin_id)
{
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
char *bufptr;
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
- if (origin_id != InvalidRepOriginId)
+ if (origin_id != InvalidReplOriginId)
{
/* recover apply progress */
replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
* Are we using the replication origins feature? Or, in other words,
* are we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
/*
* Are we using the replication origins feature? Or, in other words, are
* we replaying remote actions?
*/
- replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin = (replorigin_session_origin != InvalidReplOriginId &&
replorigin_session_origin != DoNotReplicateId);
/* Fetch the data we need for the abort record */
}
/* dump transaction origin information */
- if (replorigin_session_origin != InvalidRepOriginId)
+ if (replorigin_session_origin != InvalidReplOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
* Dump transaction origin information. We need this during recovery to
* update the replication origin progress.
*/
- if (replorigin_session_origin != InvalidRepOriginId)
+ if (replorigin_session_origin != InvalidReplOriginId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid,
XLogRecPtr lsn,
- RepOriginId origin_id)
+ ReplOriginId origin_id)
{
TransactionId max_xid;
TimestampTz commit_time;
AdvanceNextFullTransactionIdPastXid(max_xid);
Assert(((parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == 0) ==
- (origin_id == InvalidRepOriginId));
+ (origin_id == InvalidReplOriginId));
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
commit_time = parsed->origin_timestamp;
*/
static void
xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
- XLogRecPtr lsn, RepOriginId origin_id)
+ XLogRecPtr lsn, ReplOriginId origin_id)
{
TransactionId max_xid;
static XLogRecData hdr_rdt;
static char *hdr_scratch = NULL;
-#define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char))
+#define SizeOfXlogOrigin (sizeof(ReplOriginId) + sizeof(char))
#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char))
#define HEADER_SCRATCH_SIZE \
/* followed by the record's origin, if any */
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
- replorigin_session_origin != InvalidRepOriginId)
+ replorigin_session_origin != InvalidReplOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
decoded->header = *record;
decoded->lsn = lsn;
decoded->next = NULL;
- decoded->record_origin = InvalidRepOriginId;
+ decoded->record_origin = InvalidReplOriginId;
decoded->toplevel_xid = InvalidTransactionId;
decoded->main_data = NULL;
decoded->main_data_len = 0;
}
else if (block_id == XLR_BLOCK_ID_ORIGIN)
{
- COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
+ COPY_HEADER_FIELD(&decoded->record_origin, sizeof(ReplOriginId));
}
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
{
*/
if (XLogRecPtrIsValid(opts.lsn))
{
- RepOriginId originid;
+ ReplOriginId originid;
char originname[NAMEDATALEN];
XLogRecPtr remote_lsn;
TransactionId oldestxmin,
TransactionId *delete_xid,
TimestampTz *delete_time,
- RepOriginId *delete_origin)
+ ReplOriginId *delete_origin)
{
BufferHeapTupleTableSlot *hslot;
HeapTuple tuple;
bool recently_dead = false;
TransactionId xmax;
TimestampTz localts;
- RepOriginId localorigin;
+ ReplOriginId localorigin;
hslot = (BufferHeapTupleTableSlot *) scanslot;
RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
TransactionId oldestxmin,
TransactionId *delete_xid,
- RepOriginId *delete_origin,
+ ReplOriginId *delete_origin,
TimestampTz *delete_time)
{
TupleTableSlot *scanslot;
Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
*delete_xid = InvalidTransactionId;
- *delete_origin = InvalidRepOriginId;
+ *delete_origin = InvalidReplOriginId;
*delete_time = 0;
/*
TupleTableSlot *searchslot,
TransactionId oldestxmin,
TransactionId *delete_xid,
- RepOriginId *delete_origin,
+ ReplOriginId *delete_origin,
TimestampTz *delete_time)
{
Relation idxrel;
*delete_xid = InvalidTransactionId;
*delete_time = 0;
- *delete_origin = InvalidRepOriginId;
+ *delete_origin = InvalidReplOriginId;
isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
shm_mq *mq;
shm_mq_handle *mqh;
shm_mq_handle *error_mqh;
- RepOriginId originid;
+ ReplOriginId originid;
int worker_slot = DatumGetInt32(main_arg);
char originname[NAMEDATALEN];
TupleTableSlot *localslot,
TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin,
+ ReplOriginId localorigin,
TimestampTz localts, StringInfo err_msg);
static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo,
ConflictType type, char **key_desc,
*/
bool
GetTupleTransactionInfo(TupleTableSlot *localslot, TransactionId *xmin,
- RepOriginId *localorigin, TimestampTz *localts)
+ ReplOriginId *localorigin, TimestampTz *localts)
{
Datum xminDatum;
bool isnull;
*/
if (!track_commit_timestamp)
{
- *localorigin = InvalidRepOriginId;
+ *localorigin = InvalidReplOriginId;
*localts = 0;
return false;
}
ConflictType type, TupleTableSlot *searchslot,
TupleTableSlot *localslot, TupleTableSlot *remoteslot,
Oid indexoid, TransactionId localxmin,
- RepOriginId localorigin, TimestampTz localts,
+ ReplOriginId localorigin, TimestampTz localts,
StringInfo err_msg)
{
StringInfoData err_detail;
if (localts)
{
- if (localorigin == InvalidRepOriginId)
+ if (localorigin == InvalidReplOriginId)
appendStringInfo(&err_detail, _("Key already exists in unique index \"%s\", modified locally in transaction %u at %s"),
get_rel_name(indexoid),
localxmin, timestamptz_to_str(localts));
break;
case CT_UPDATE_ORIGIN_DIFFERS:
- if (localorigin == InvalidRepOriginId)
+ if (localorigin == InvalidReplOriginId)
appendStringInfo(&err_detail, _("Updating the row that was modified locally in transaction %u at %s"),
localxmin, timestamptz_to_str(localts));
else if (replorigin_by_oid(localorigin, true, &origin_name))
if (localts)
{
- if (localorigin == InvalidRepOriginId)
+ if (localorigin == InvalidReplOriginId)
appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s"),
localxmin, timestamptz_to_str(localts));
else if (replorigin_by_oid(localorigin, true, &origin_name))
break;
case CT_DELETE_ORIGIN_DIFFERS:
- if (localorigin == InvalidRepOriginId)
+ if (localorigin == InvalidReplOriginId)
appendStringInfo(&err_detail, _("Deleting the row that was modified locally in transaction %u at %s"),
localxmin, timestamptz_to_str(localts));
else if (replorigin_by_oid(localorigin, true, &origin_name))
TransactionId xid, const char *gid);
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf, Oid txn_dbid,
- RepOriginId origin_id);
+ ReplOriginId origin_id);
/*
* Take every XLogReadRecord()ed record and perform the actions required to
}
static inline bool
-FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
{
if (ctx->callbacks.filter_by_origin_cb == NULL)
return false;
XLogReaderState *r = buf->record;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
- RepOriginId origin_id = XLogRecGetOrigin(r);
+ ReplOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot = NULL;
xl_logical_message *message;
{
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
TimestampTz commit_time = parsed->xact_time;
- RepOriginId origin_id = XLogRecGetOrigin(buf->record);
+ ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
int i;
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
SnapBuild *builder = ctx->snapshot_builder;
XLogRecPtr origin_lsn = parsed->origin_lsn;
TimestampTz prepare_time = parsed->xact_time;
- RepOriginId origin_id = XLogRecGetOrigin(buf->record);
+ ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
int i;
TransactionId xid = parsed->twophase_xid;
int i;
XLogRecPtr origin_lsn = InvalidXLogRecPtr;
TimestampTz abort_time = parsed->xact_time;
- RepOriginId origin_id = XLogRecGetOrigin(buf->record);
+ ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
bool skip_xact;
if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
*/
static bool
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
- Oid txn_dbid, RepOriginId origin_id)
+ Oid txn_dbid, ReplOriginId origin_id)
{
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
}
bool
-filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
+filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginId origin_id)
{
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
/*
* Local identifier for the remote node.
*/
- RepOriginId roident;
+ ReplOriginId roident;
/*
* Location of the latest commit from the remote side.
*/
typedef struct ReplicationStateOnDisk
{
- RepOriginId roident;
+ ReplOriginId roident;
XLogRecPtr remote_lsn;
} ReplicationStateOnDisk;
} ReplicationStateCtl;
/* external variables */
-RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
+ReplOriginId replorigin_session_origin = InvalidReplOriginId; /* assumed identity */
XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr;
TimestampTz replorigin_session_origin_timestamp = 0;
*
* Returns InvalidOid if the node isn't known yet and missing_ok is true.
*/
-RepOriginId
+ReplOriginId
replorigin_by_name(const char *roname, bool missing_ok)
{
Form_pg_replication_origin ident;
*
* Needs to be called in a transaction.
*/
-RepOriginId
+ReplOriginId
replorigin_create(const char *roname)
{
Oid roident;
* Helper function to drop a replication origin.
*/
static void
-replorigin_state_clear(RepOriginId roident, bool nowait)
+replorigin_state_clear(ReplOriginId roident, bool nowait)
{
int i;
}
/* then clear the in-memory slot */
- state->roident = InvalidRepOriginId;
+ state->roident = InvalidReplOriginId;
state->remote_lsn = InvalidXLogRecPtr;
state->local_lsn = InvalidXLogRecPtr;
break;
void
replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
{
- RepOriginId roident;
+ ReplOriginId roident;
Relation rel;
HeapTuple tuple;
* Returns true if the origin is known, false otherwise.
*/
bool
-replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
+replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
{
HeapTuple tuple;
Form_pg_replication_origin ric;
Assert(OidIsValid((Oid) roident));
- Assert(roident != InvalidRepOriginId);
+ Assert(roident != InvalidReplOriginId);
Assert(roident != DoNotReplicateId);
tuple = SearchSysCache1(REPLORIGIDENT,
ReplicationState *curstate = &replication_states[i];
XLogRecPtr local_lsn;
- if (curstate->roident == InvalidRepOriginId)
+ if (curstate->roident == InvalidReplOriginId)
continue;
/* zero, to avoid uninitialized padding bytes */
if (state->roident == xlrec->node_id)
{
/* reset entry */
- state->roident = InvalidRepOriginId;
+ state->roident = InvalidReplOriginId;
state->remote_lsn = InvalidXLogRecPtr;
state->local_lsn = InvalidXLogRecPtr;
break;
* unless running in recovery.
*/
void
-replorigin_advance(RepOriginId node,
+replorigin_advance(ReplOriginId node,
XLogRecPtr remote_commit, XLogRecPtr local_commit,
bool go_backward, bool wal_log)
{
ReplicationState *replication_state = NULL;
ReplicationState *free_state = NULL;
- Assert(node != InvalidRepOriginId);
+ Assert(node != InvalidReplOriginId);
/* we don't track DoNotReplicateId */
if (node == DoNotReplicateId)
ReplicationState *curstate = &replication_states[i];
/* remember where to insert if necessary */
- if (curstate->roident == InvalidRepOriginId &&
+ if (curstate->roident == InvalidReplOriginId &&
free_state == NULL)
{
free_state = curstate;
replication_state->roident = node;
}
- Assert(replication_state->roident != InvalidRepOriginId);
+ Assert(replication_state->roident != InvalidReplOriginId);
/*
* If somebody "forcefully" sets this slot, WAL log it, so it's durable
XLogRecPtr
-replorigin_get_progress(RepOriginId node, bool flush)
+replorigin_get_progress(ReplOriginId node, bool flush)
{
int i;
XLogRecPtr local_lsn = InvalidXLogRecPtr;
* acquired_by = PID of the first process.
*/
void
-replorigin_session_setup(RepOriginId node, int acquired_by)
+replorigin_session_setup(ReplOriginId node, int acquired_by)
{
static bool registered_cleanup;
int i;
ReplicationState *curstate = &replication_states[i];
/* remember where to insert if necessary */
- if (curstate->roident == InvalidRepOriginId &&
+ if (curstate->roident == InvalidReplOriginId &&
free_slot == -1)
{
free_slot = i;
}
- Assert(session_replication_state->roident != InvalidRepOriginId);
+ Assert(session_replication_state->roident != InvalidReplOriginId);
if (acquired_by == 0)
{
replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
{
Assert(session_replication_state != NULL);
- Assert(session_replication_state->roident != InvalidRepOriginId);
+ Assert(session_replication_state->roident != InvalidReplOriginId);
LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
if (session_replication_state->local_lsn < local_commit)
pg_replication_origin_create(PG_FUNCTION_ARGS)
{
char *name;
- RepOriginId roident;
+ ReplOriginId roident;
replorigin_check_prerequisites(false, false);
pg_replication_origin_oid(PG_FUNCTION_ARGS)
{
char *name;
- RepOriginId roident;
+ ReplOriginId roident;
replorigin_check_prerequisites(false, false);
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
- RepOriginId origin;
+ ReplOriginId origin;
int pid;
replorigin_check_prerequisites(true, false);
replorigin_session_reset();
- replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin = InvalidReplOriginId;
replorigin_session_origin_lsn = InvalidXLogRecPtr;
replorigin_session_origin_timestamp = 0;
{
replorigin_check_prerequisites(false, false);
- PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
+ PG_RETURN_BOOL(replorigin_session_origin != InvalidReplOriginId);
}
{
text *name = PG_GETARG_TEXT_PP(0);
XLogRecPtr remote_commit = PG_GETARG_LSN(1);
- RepOriginId node;
+ ReplOriginId node;
replorigin_check_prerequisites(true, false);
{
char *name;
bool flush;
- RepOriginId roident;
+ ReplOriginId roident;
XLogRecPtr remote_lsn = InvalidXLogRecPtr;
replorigin_check_prerequisites(true, true);
state = &replication_states[i];
/* unused slot, nothing to display */
- if (state->roident == InvalidRepOriginId)
+ if (state->roident == InvalidReplOriginId)
continue;
memset(values, 0, sizeof(values));
ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
- RepOriginId origin_id, XLogRecPtr origin_lsn)
+ ReplOriginId origin_id, XLogRecPtr origin_lsn)
{
Snapshot snapshot_now;
CommandId command_id = FirstCommandId;
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time,
- RepOriginId origin_id, XLogRecPtr origin_lsn)
+ ReplOriginId origin_id, XLogRecPtr origin_lsn)
{
ReorderBufferTXN *txn;
ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
TimestampTz prepare_time,
- RepOriginId origin_id, XLogRecPtr origin_lsn)
+ ReplOriginId origin_id, XLogRecPtr origin_lsn)
{
ReorderBufferTXN *txn;
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
XLogRecPtr two_phase_at,
- TimestampTz commit_time, RepOriginId origin_id,
+ TimestampTz commit_time, ReplOriginId origin_id,
XLogRecPtr origin_lsn, char *gid, bool is_commit)
{
ReorderBufferTXN *txn;
* This is needed to allow the origin to be dropped.
*/
replorigin_session_reset();
- replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin = InvalidReplOriginId;
replorigin_session_origin_lsn = InvalidXLogRecPtr;
replorigin_session_origin_timestamp = 0;
AclResult aclresult;
WalRcvExecResult *res;
char originname[NAMEDATALEN];
- RepOriginId originid;
+ ReplOriginId originid;
UserContext ucxt;
bool must_use_password;
bool run_as_owner;
Oid localidxoid,
TupleTableSlot *remoteslot,
TransactionId *delete_xid,
- RepOriginId *delete_origin,
+ ReplOriginId *delete_origin,
TimestampTz *delete_time);
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
static bool
FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
TupleTableSlot *remoteslot,
- TransactionId *delete_xid, RepOriginId *delete_origin,
+ TransactionId *delete_xid, ReplOriginId *delete_origin,
TimestampTz *delete_time)
{
TransactionId oldestxmin;
XLogRecPtr origin_startpos = InvalidXLogRecPtr;
char *slotname = NULL;
WalRcvStreamOptions options;
- RepOriginId originid;
+ ReplOriginId originid;
TimeLineID startpointTLI;
char *err;
bool must_use_password;
static void
replorigin_reset(int code, Datum arg)
{
- replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin = InvalidReplOriginId;
replorigin_session_origin_lsn = InvalidXLogRecPtr;
replorigin_session_origin_timestamp = 0;
}
bool transactional, const char *prefix,
Size sz, const char *message);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
- RepOriginId origin_id);
+ ReplOriginId origin_id);
static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_repl_origin(LogicalDecodingContext *ctx,
- RepOriginId origin_id, XLogRecPtr origin_lsn,
+ ReplOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
/*
static void
pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
- bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
Assert(txndata);
static void
pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
- bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin_prepare(ctx->out, txn);
*/
static bool
pgoutput_origin_filter(LogicalDecodingContext *ctx,
- RepOriginId origin_id)
+ ReplOriginId origin_id)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
- if (data->publish_no_origin && origin_id != InvalidRepOriginId)
+ if (data->publish_no_origin && origin_id != InvalidReplOriginId)
return true;
return false;
ReorderBufferTXN *txn)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
- bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ bool send_replication_origin = txn->origin_id != InvalidReplOriginId;
/* we can't nest streaming of transactions */
Assert(!data->in_streaming);
/* Send Replication origin */
static void
-send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
+send_repl_origin(LogicalDecodingContext *ctx, ReplOriginId origin_id,
XLogRecPtr origin_lsn, bool send_origin)
{
if (send_origin)
Oid subid;
char *subname;
char originname[NAMEDATALEN];
- RepOriginId node;
+ ReplOriginId node;
XLogRecPtr remote_commit;
CHECK_IS_BINARY_UPGRADE;
bool failed = false;
int max_lrworkers;
- int max_reporigins;
+ int max_replorigins;
int max_wprocs;
pg_log_info("checking settings on subscriber");
disconnect_database(conn, true);
}
- max_reporigins = atoi(PQgetvalue(res, 0, 0));
+ max_replorigins = atoi(PQgetvalue(res, 0, 0));
max_lrworkers = atoi(PQgetvalue(res, 1, 0));
max_wprocs = atoi(PQgetvalue(res, 2, 0));
if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
pg_log_debug("subscriber: max_logical_replication_workers: %d",
max_lrworkers);
- pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins);
+ pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
if (primary_slot_name)
pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
disconnect_database(conn, false);
- if (max_reporigins < num_dbs)
+ if (max_replorigins < num_dbs)
{
pg_log_error("subscriber requires %d active replication origins, but only %d remain",
- num_dbs, max_reporigins);
+ num_dbs, max_replorigins);
pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
"max_active_replication_origins", num_dbs);
failed = true;
);
my $sub_oid = $old_sub->safe_psql('postgres',
"SELECT oid FROM pg_subscription WHERE subname = 'regress_sub3'");
-my $reporigin = 'pg_' . qq($sub_oid);
+my $replorigin = 'pg_' . qq($sub_oid);
$old_sub->safe_psql('postgres',
- "SELECT pg_replication_origin_drop('$reporigin')");
+ "SELECT pg_replication_origin_drop('$replorigin')");
$old_sub->stop;
extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz timestamp,
- RepOriginId nodeid);
+ ReplOriginId nodeid);
extern bool TransactionIdGetCommitTsData(TransactionId xid,
- TimestampTz *ts, RepOriginId *nodeid);
+ TimestampTz *ts, ReplOriginId *nodeid);
extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
- RepOriginId *nodeid);
+ ReplOriginId *nodeid);
extern Size CommitTsShmemSize(void);
extern void CommitTsShmemInit(void);
extern void PrepareRedoAdd(FullTransactionId fxid, char *buf,
XLogRecPtr start_lsn, XLogRecPtr end_lsn,
- RepOriginId origin_id);
+ ReplOriginId origin_id);
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
* Replication origin id - this is located in this file to avoid having to
* include origin.h in a bunch of xlog related places.
*/
-typedef uint16 RepOriginId;
+typedef uint16 ReplOriginId;
/*
* This chunk of hackery attempts to determine which file sync methods
XLogRecPtr lsn; /* location */
XLogRecPtr next_lsn; /* location of next record */
XLogRecord header; /* header */
- RepOriginId record_origin;
+ ReplOriginId record_origin;
TransactionId toplevel_xid; /* XID of top-level transaction */
char *main_data; /* record's main data portion */
uint32 main_data_len; /* main data portion's length */
TupleTableSlot *searchslot,
TransactionId oldestxmin,
TransactionId *delete_xid,
- RepOriginId *delete_origin,
+ ReplOriginId *delete_origin,
TimestampTz *delete_time);
extern bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
TupleTableSlot *searchslot,
TransactionId oldestxmin,
TransactionId *delete_xid,
- RepOriginId *delete_origin,
+ ReplOriginId *delete_origin,
TimestampTz *delete_time);
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
EState *estate, TupleTableSlot *slot);
* occurred */
TransactionId xmin; /* transaction ID of the modification causing
* the conflict */
- RepOriginId origin; /* origin identifier of the modification */
+ ReplOriginId origin; /* origin identifier of the modification */
TimestampTz ts; /* timestamp of when the modification on the
* conflicting local row occurred */
} ConflictTupleInfo;
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
- RepOriginId *localorigin,
+ ReplOriginId *localorigin,
TimestampTz *localts);
extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo,
int elevel, ConflictType type,
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
-extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
typedef struct xl_replorigin_set
{
XLogRecPtr remote_lsn;
- RepOriginId node_id;
+ ReplOriginId node_id;
bool force;
} xl_replorigin_set;
typedef struct xl_replorigin_drop
{
- RepOriginId node_id;
+ ReplOriginId node_id;
} xl_replorigin_drop;
#define XLOG_REPLORIGIN_SET 0x00
#define XLOG_REPLORIGIN_DROP 0x10
-#define InvalidRepOriginId 0
+#define InvalidReplOriginId 0
#define DoNotReplicateId PG_UINT16_MAX
/*
*/
#define MAX_RONAME_LEN 512
-extern PGDLLIMPORT RepOriginId replorigin_session_origin;
+extern PGDLLIMPORT ReplOriginId replorigin_session_origin;
extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn;
extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
extern PGDLLIMPORT int max_active_replication_origins;
/* API for querying & manipulating replication origins */
-extern RepOriginId replorigin_by_name(const char *roname, bool missing_ok);
-extern RepOriginId replorigin_create(const char *roname);
+extern ReplOriginId replorigin_by_name(const char *roname, bool missing_ok);
+extern ReplOriginId replorigin_create(const char *roname);
extern void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait);
-extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
+extern bool replorigin_by_oid(ReplOriginId roident, bool missing_ok,
char **roname);
/* API for querying & manipulating replication progress tracking */
-extern void replorigin_advance(RepOriginId node,
+extern void replorigin_advance(ReplOriginId node,
XLogRecPtr remote_commit,
XLogRecPtr local_commit,
bool go_backward, bool wal_log);
-extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush);
+extern XLogRecPtr replorigin_get_progress(ReplOriginId node, bool flush);
extern void replorigin_session_advance(XLogRecPtr remote_commit,
XLogRecPtr local_commit);
-extern void replorigin_session_setup(RepOriginId node, int acquired_by);
+extern void replorigin_session_setup(ReplOriginId node, int acquired_by);
extern void replorigin_session_reset(void);
extern XLogRecPtr replorigin_session_get_progress(bool flush);
* Filter changes by origin.
*/
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
- RepOriginId origin_id);
+ ReplOriginId origin_id);
/*
* Called to shutdown an output plugin.
/* Transaction this change belongs to. */
struct ReorderBufferTXN *txn;
- RepOriginId origin_id;
+ ReplOriginId origin_id;
/*
* Context data for the change. Which part of the union is valid depends
XLogRecPtr restart_decoding_lsn;
/* origin of the change that caused this transaction */
- RepOriginId origin_id;
+ ReplOriginId origin_id;
XLogRecPtr origin_lsn;
/*
Size message_size, const char *message);
extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
- TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+ TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn);
extern void ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
XLogRecPtr two_phase_at,
TimestampTz commit_time,
- RepOriginId origin_id, XLogRecPtr origin_lsn,
+ ReplOriginId origin_id, XLogRecPtr origin_lsn,
char *gid, bool is_commit);
extern void ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr lsn);
extern bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
TimestampTz prepare_time,
- RepOriginId origin_id, XLogRecPtr origin_lsn);
+ ReplOriginId origin_id, XLogRecPtr origin_lsn);
extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *rb);
ReorderBufferTupleCidKey
ReorderBufferUpdateProgressTxnCB
ReorderTuple
-RepOriginId
ReparameterizeForeignPathByChild_function
+ReplOriginId
ReplaceVarsFromTargetList_context
ReplaceVarsNoMatchOption
ReplaceWrapOption