<structfield>subretaindeadtuples</structfield> <type>bool</type>
</para>
<para>
- If true, the information (e.g., dead tuples, commit timestamps, and
+ If true, the detection of <xref linkend="conflict-update-deleted"/> is
+ enabled and the information (e.g., dead tuples, commit timestamps, and
origins) on the subscriber that is useful for conflict detection is
retained.
</para></entry>
</para>
</listitem>
</varlistentry>
+ <varlistentry id="conflict-update-deleted" xreflabel="update_deleted">
+ <term><literal>update_deleted</literal></term>
+ <listitem>
+ <para>
+ The tuple to be updated was concurrently deleted by another origin. The
+ update will simply be skipped in this scenario. Note that this conflict
+ can only be detected when
+ <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+ and <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+ are enabled. Note that if a tuple cannot be found due to the table being
+ truncated, only a <literal>update_missing</literal> conflict will
+ arise. Additionally, if the tuple was deleted by the same origin, an
+ <literal>update_missing</literal> conflict will arise.
+ </para>
+ </listitem>
+ </varlistentry>
<varlistentry id="conflict-update-missing" xreflabel="update_missing">
<term><literal>update_missing</literal></term>
<listitem>
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_update_deleted</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times the tuple to be updated was concurrently deleted by
+ another source during the application of changes. See <xref linkend="conflict-update-deleted"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>confl_update_missing</structfield> <type>bigint</type>
Specifies whether the information (e.g., dead tuples, commit
timestamps, and origins) required for conflict detection on the
subscriber is retained. The default is <literal>false</literal>.
- If set to <literal>true</literal>, a physical replication slot named
- <quote><literal>pg_conflict_detection</literal></quote> will be
- created on the subscriber to prevent the conflict information from
- being removed.
+ If set to <literal>true</literal>, the detection of
+ <xref linkend="conflict-update-deleted"/> is enabled, and a physical
+ replication slot named <quote><literal>pg_conflict_detection</literal></quote>
+ created on the subscriber to prevent the information for detecting
+ conflicts from being removed.
</para>
<para>
ss.confl_insert_exists,
ss.confl_update_origin_differs,
ss.confl_update_exists,
+ ss.confl_update_deleted,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
#include "postgres.h"
+#include "access/commit_ts.h"
#include "access/genam.h"
#include "access/gist.h"
#include "access/relscan.h"
#include "access/tableam.h"
#include "access/transam.h"
#include "access/xact.h"
+#include "access/heapam.h"
#include "catalog/pg_am_d.h"
#include "commands/trigger.h"
#include "executor/executor.h"
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
- TypeCacheEntry **eq);
+ TypeCacheEntry **eq, Bitmapset *columns);
/*
* Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
if (eq == NULL)
eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
- if (!tuples_equal(outslot, searchslot, eq))
+ if (!tuples_equal(outslot, searchslot, eq, NULL))
continue;
}
/*
* Compare the tuples in the slots by checking if they have equal values.
+ *
+ * If 'columns' is not null, only the columns specified within it will be
+ * considered for the equality check, ignoring all other columns.
*/
static bool
tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
- TypeCacheEntry **eq)
+ TypeCacheEntry **eq, Bitmapset *columns)
{
int attrnum;
if (att->attisdropped || att->attgenerated)
continue;
+ /*
+ * Ignore columns that are not listed for checking.
+ */
+ if (columns &&
+ !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+ columns))
+ continue;
+
/*
* If one value is NULL and other is not, then they are certainly not
* equal
/* Try to find the tuple */
while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
{
- if (!tuples_equal(scanslot, searchslot, eq))
+ if (!tuples_equal(scanslot, searchslot, eq, NULL))
continue;
found = true;
}
}
+/*
+ * If the tuple is recently dead and was deleted by a transaction with a newer
+ * commit timestamp than previously recorded, update the associated transaction
+ * ID, commit time, and origin. This helps ensure that conflict detection uses
+ * the most recent and relevant deletion metadata.
+ */
+static void
+update_most_recent_deletion_info(TupleTableSlot *scanslot,
+ TransactionId oldestxmin,
+ TransactionId *delete_xid,
+ TimestampTz *delete_time,
+ RepOriginId *delete_origin)
+{
+ BufferHeapTupleTableSlot *hslot;
+ HeapTuple tuple;
+ Buffer buf;
+ bool recently_dead = false;
+ TransactionId xmax;
+ TimestampTz localts;
+ RepOriginId localorigin;
+
+ hslot = (BufferHeapTupleTableSlot *) scanslot;
+
+ tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
+ buf = hslot->buffer;
+
+ LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+ /*
+ * We do not consider HEAPTUPLE_DEAD status because it indicates either
+ * tuples whose inserting transaction was aborted (meaning there is no
+ * commit timestamp or origin), or tuples deleted by a transaction older
+ * than oldestxmin, making it safe to ignore them during conflict
+ * detection (See comments atop worker.c for details).
+ */
+ if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
+ recently_dead = true;
+
+ LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+ if (!recently_dead)
+ return;
+
+ xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
+ if (!TransactionIdIsValid(xmax))
+ return;
+
+ /* Select the dead tuple with the most recent commit timestamp */
+ if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
+ TimestampDifferenceExceeds(*delete_time, localts, 0))
+ {
+ *delete_xid = xmax;
+ *delete_time = localts;
+ *delete_origin = localorigin;
+ }
+}
+
+/*
+ * Searches the relation 'rel' for the most recently deleted tuple that matches
+ * the values in 'searchslot' and is not yet removable by VACUUM. The function
+ * returns the transaction ID, origin, and commit timestamp of the transaction
+ * that deleted this tuple.
+ *
+ * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
+ * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
+ * conflict detection.
+ *
+ * Instead of stopping at the first match, we scan all matching dead tuples to
+ * identify most recent deletion. This is crucial because only the latest
+ * deletion is relevant for resolving conflicts.
+ *
+ * For example, consider a scenario on the subscriber where a row is deleted,
+ * re-inserted, and then deleted again only on the subscriber:
+ *
+ * - (pk, 1) - deleted at 9:00,
+ * - (pk, 1) - deleted at 9:02,
+ *
+ * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
+ *
+ * If we mistakenly return the older deletion (9:00), the system may wrongly
+ * apply the remote update using a last-update-wins strategy. Instead, we must
+ * recognize the more recent deletion at 9:02 and skip the update. See
+ * comments atop worker.c for details. Note, as of now, conflict resolution
+ * is not implemented. Consequently, the system may incorrectly report the
+ * older tuple as the conflicted one, leading to misleading results.
+ *
+ * The commit timestamp of the deleting transaction is used to determine which
+ * tuple was deleted most recently.
+ */
+bool
+RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
+ TransactionId oldestxmin,
+ TransactionId *delete_xid,
+ RepOriginId *delete_origin,
+ TimestampTz *delete_time)
+{
+ TupleTableSlot *scanslot;
+ TableScanDesc scan;
+ TypeCacheEntry **eq;
+ Bitmapset *indexbitmap;
+ TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+ Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+
+ *delete_xid = InvalidTransactionId;
+ *delete_origin = InvalidRepOriginId;
+ *delete_time = 0;
+
+ /*
+ * If the relation has a replica identity key or a primary key that is
+ * unusable for locating deleted tuples (see
+ * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
+ * necessary. In such cases, comparing the entire tuple is not required,
+ * since the remote tuple might not include all column values. Instead,
+ * the indexed columns alone are suffcient to identify the target tuple
+ * (see logicalrep_rel_mark_updatable).
+ */
+ indexbitmap = RelationGetIndexAttrBitmap(rel,
+ INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+ /* fallback to PK if no replica identity */
+ if (!indexbitmap)
+ indexbitmap = RelationGetIndexAttrBitmap(rel,
+ INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+ eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
+
+ /*
+ * Start a heap scan using SnapshotAny to identify dead tuples that are
+ * not visible under a standard MVCC snapshot. Tuples from transactions
+ * not yet committed or those just committed prior to the scan are
+ * excluded in update_most_recent_deletion_info().
+ */
+ scan = table_beginscan(rel, SnapshotAny, 0, NULL);
+ scanslot = table_slot_create(rel, NULL);
+
+ table_rescan(scan, NULL);
+
+ /* Try to find the tuple */
+ while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
+ {
+ if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
+ continue;
+
+ update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
+ delete_time, delete_origin);
+ }
+
+ table_endscan(scan);
+ ExecDropSingleTupleTableSlot(scanslot);
+
+ return *delete_time != 0;
+}
+
+/*
+ * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
+ * the deleted tuple.
+ */
+bool
+RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+ TupleTableSlot *searchslot,
+ TransactionId oldestxmin,
+ TransactionId *delete_xid,
+ RepOriginId *delete_origin,
+ TimestampTz *delete_time)
+{
+ Relation idxrel;
+ ScanKeyData skey[INDEX_MAX_KEYS];
+ int skey_attoff;
+ IndexScanDesc scan;
+ TupleTableSlot *scanslot;
+ TypeCacheEntry **eq = NULL;
+ bool isIdxSafeToSkipDuplicates;
+ TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+ Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+ Assert(OidIsValid(idxoid));
+
+ *delete_xid = InvalidTransactionId;
+ *delete_time = 0;
+ *delete_origin = InvalidRepOriginId;
+
+ isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
+
+ scanslot = table_slot_create(rel, NULL);
+
+ idxrel = index_open(idxoid, RowExclusiveLock);
+
+ /* Build scan key. */
+ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+ /*
+ * Start an index scan using SnapshotAny to identify dead tuples that are
+ * not visible under a standard MVCC snapshot. Tuples from transactions
+ * not yet committed or those just committed prior to the scan are
+ * excluded in update_most_recent_deletion_info().
+ */
+ scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
+
+ index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+ /* Try to find the tuple */
+ while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
+ {
+ /*
+ * Avoid expensive equality check if the index is primary key or
+ * replica identity index.
+ */
+ if (!isIdxSafeToSkipDuplicates)
+ {
+ if (eq == NULL)
+ eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
+
+ if (!tuples_equal(scanslot, searchslot, eq, NULL))
+ continue;
+ }
+
+ update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
+ delete_time, delete_origin);
+ }
+
+ index_endscan(scan);
+
+ index_close(idxrel, NoLock);
+
+ ExecDropSingleTupleTableSlot(scanslot);
+
+ return *delete_time != 0;
+}
+
/*
* Find the tuple that violates the passed unique index (conflictindex).
*
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
+ [CT_UPDATE_DELETED] = "update_deleted",
[CT_DELETE_MISSING] = "delete_missing",
[CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
};
case CT_UPDATE_ORIGIN_DIFFERS:
case CT_UPDATE_MISSING:
case CT_DELETE_ORIGIN_DIFFERS:
+ case CT_UPDATE_DELETED:
case CT_DELETE_MISSING:
return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
}
break;
+ case CT_UPDATE_DELETED:
+ if (localts)
+ {
+ if (localorigin == InvalidRepOriginId)
+ appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ else if (replorigin_by_oid(localorigin, true, &origin_name))
+ appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
+ origin_name, localxmin, timestamptz_to_str(localts));
+
+ /* The origin that modified this row has been removed. */
+ else
+ appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
+ localxmin, timestamptz_to_str(localts));
+ }
+ else
+ appendStringInfo(&err_detail, _("The row to be updated was deleted."));
+
+ break;
+
case CT_UPDATE_MISSING:
appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
break;
* Each apply worker that enabled retain_dead_tuples option maintains a
* non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
* prevent dead rows from being removed prematurely when the apply worker still
- * needs them to detect conflicts reliably. This helps to retain the required
- * commit_ts module information, which further helps to detect
- * update_origin_differs and delete_origin_differs conflicts reliably, as
+ * needs them to detect update_deleted conflicts. Additionally, this helps to
+ * retain the required commit_ts module information, which further helps to
+ * detect update_origin_differs and delete_origin_differs conflicts reliably, as
* otherwise, vacuum freeze could remove the required information.
*
* The logical replication launcher manages an internal replication slot named
* transactions that occurred concurrently with the tuple DELETE, any
* subsequent UPDATE from a remote node should have a later timestamp. In such
* cases, it is acceptable to detect an update_missing scenario and convert the
- * UPDATE to an INSERT when applying it. But, detecting concurrent remote
- * transactions with earlier timestamps than the DELETE is necessary, as the
- * UPDATEs in remote transactions should be ignored if their timestamp is
- * earlier than that of the dead tuples.
+ * UPDATE to an INSERT when applying it. But, for concurrent remote
+ * transactions with earlier timestamps than the DELETE, detecting
+ * update_deleted is necessary, as the UPDATEs in remote transactions should be
+ * ignored if their timestamp is earlier than that of the dead tuples.
*
* Note that advancing the non-removable transaction ID is not supported if the
* publisher is also a physical standby. This is because the logical walsender
Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
+static bool FindDeletedTupleInLocalRel(Relation localrel,
+ Oid localidxoid,
+ TupleTableSlot *remoteslot,
+ TransactionId *delete_xid,
+ RepOriginId *delete_origin,
+ TimestampTz *delete_time);
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
TupleTableSlot *remoteslot,
LogicalRepTupleData *newtup,
}
else
{
+ ConflictType type;
TupleTableSlot *newslot = localslot;
+ /*
+ * Detecting whether the tuple was recently deleted or never existed
+ * is crucial to avoid misleading the user during confict handling.
+ */
+ if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
+ &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ type = CT_UPDATE_DELETED;
+ else
+ type = CT_UPDATE_MISSING;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, relmapentry, newtup);
/*
- * The tuple to be updated could not be found. Do nothing except for
- * emitting a log message.
+ * The tuple to be updated could not be found or was deleted. Do
+ * nothing except for emitting a log message.
*/
- ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
- remoteslot, newslot, list_make1(&conflicttuple));
+ ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
+ list_make1(&conflicttuple));
}
/* Cleanup. */
return found;
}
+/*
+ * Determine whether the index can reliably locate the deleted tuple in the
+ * local relation.
+ *
+ * An index may exclude deleted tuples if it was re-indexed or re-created during
+ * change application. Therefore, an index is considered usable only if the
+ * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
+ * index tuple's xmin. This ensures that any tuples deleted prior to the index
+ * creation or re-indexing are not relevant for conflict detection in the
+ * current apply worker.
+ *
+ * Note that indexes may also be excluded if they were modified by other DDL
+ * operations, such as ALTER INDEX. However, this is acceptable, as the
+ * likelihood of such DDL changes coinciding with the need to scan dead
+ * tuples for the update_deleted is low.
+ */
+static bool
+IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
+ TransactionId conflict_detection_xmin)
+{
+ HeapTuple index_tuple;
+ TransactionId index_xmin;
+
+ index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
+
+ if (!HeapTupleIsValid(index_tuple)) /* should not happen */
+ elog(ERROR, "cache lookup failed for index %u", localindexoid);
+
+ /*
+ * No need to check for a frozen transaction ID, as
+ * TransactionIdPrecedes() manages it internally, treating it as falling
+ * behind the conflict_detection_xmin.
+ */
+ index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
+
+ ReleaseSysCache(index_tuple);
+
+ return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
+}
+
+/*
+ * Attempts to locate a deleted tuple in the local relation that matches the
+ * values of the tuple received from the publication side (in 'remoteslot').
+ * The search is performed using either the replica identity index, primary
+ * key, other available index, or a sequential scan if necessary.
+ *
+ * Returns true if the deleted tuple is found. If found, the transaction ID,
+ * origin, and commit timestamp of the deletion are stored in '*delete_xid',
+ * '*delete_origin', and '*delete_time' respectively.
+ */
+static bool
+FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
+ TupleTableSlot *remoteslot,
+ TransactionId *delete_xid, RepOriginId *delete_origin,
+ TimestampTz *delete_time)
+{
+ TransactionId oldestxmin;
+ ReplicationSlot *slot;
+
+ /*
+ * Return false if either dead tuples are not retained or commit timestamp
+ * data is not available.
+ */
+ if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
+ return false;
+
+ /*
+ * For conflict detection, we use the conflict slot's xmin value instead
+ * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
+ * a threshold to identify tuples that were recently deleted. These tuples
+ * are not visible to concurrent transactions, but we log an
+ * update_deleted conflict if such a tuple matches the remote update being
+ * applied.
+ *
+ * Although GetOldestNonRemovableTransactionId() can return a value older
+ * than the slot's xmin, for our current purpose it is acceptable to treat
+ * tuples deleted by transactions prior to slot.xmin as update_missing
+ * conflicts.
+ *
+ * Ideally, we would use oldest_nonremovable_xid, which is directly
+ * maintained by the leader apply worker. However, this value is not
+ * available to table synchronization or parallel apply workers, making
+ * slot.xmin a practical alternative in those contexts.
+ */
+ slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+
+ Assert(slot);
+
+ SpinLockAcquire(&slot->mutex);
+ oldestxmin = slot->data.xmin;
+ SpinLockRelease(&slot->mutex);
+
+ Assert(TransactionIdIsValid(oldestxmin));
+
+ if (OidIsValid(localidxoid) &&
+ IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
+ return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
+ remoteslot, oldestxmin,
+ delete_xid, delete_origin,
+ delete_time);
+ else
+ return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
+ oldestxmin, delete_xid,
+ delete_origin, delete_time);
+}
+
/*
* This handles insert, update, delete on a partitioned table.
*/
remoteslot_part, &localslot);
if (!found)
{
+ ConflictType type;
TupleTableSlot *newslot = localslot;
+ /*
+ * Detecting whether the tuple was recently deleted or
+ * never existed is crucial to avoid misleading the user
+ * during confict handling.
+ */
+ if (FindDeletedTupleInLocalRel(partrel,
+ part_entry->localindexoid,
+ remoteslot_part,
+ &conflicttuple.xmin,
+ &conflicttuple.origin,
+ &conflicttuple.ts) &&
+ conflicttuple.origin != replorigin_session_origin)
+ type = CT_UPDATE_DELETED;
+ else
+ type = CT_UPDATE_MISSING;
+
/* Store the new tuple for conflict reporting */
slot_store_data(newslot, part_entry, newtup);
/*
- * The tuple to be updated could not be found. Do nothing
- * except for emitting a log message.
+ * The tuple to be updated could not be found or was
+ * deleted. Do nothing except for emitting a log message.
*/
ReportApplyConflict(estate, partrelinfo, LOG,
- CT_UPDATE_MISSING, remoteslot_part,
- newslot, list_make1(&conflicttuple));
+ type, remoteslot_part, newslot,
+ list_make1(&conflicttuple));
return;
}
{
/*
* It is sufficient to manage non-removable transaction ID for a
- * subscription by the main apply worker to detect conflicts reliably even
- * for table sync or parallel apply workers.
+ * subscription by the main apply worker to detect update_deleted reliably
+ * even for table sync or parallel apply workers.
*/
if (!am_leader_apply_worker())
return false;
* We expect the publisher and subscriber clocks to be in sync using time
* sync service like NTP. Otherwise, we will advance this worker's
* oldest_nonremovable_xid prematurely, leading to the removal of rows
- * required to detect conflicts reliably. This check primarily addresses
- * scenarios where the publisher's clock falls behind; if the publisher's
- * clock is ahead, subsequent transactions will naturally bear later
- * commit timestamps, conforming to the design outlined atop worker.c.
+ * required to detect update_deleted reliably. This check primarily
+ * addresses scenarios where the publisher's clock falls behind; if the
+ * publisher's clock is ahead, subsequent transactions will naturally bear
+ * later commit timestamps, conforming to the design outlined atop
+ * worker.c.
*
* XXX Consider waiting for the publisher's clock to catch up with the
* subscriber's before proceeding to the next phase.
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 11
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202507231
+#define CATALOG_VERSION_NO 202508041
#endif
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
#ifndef EXECUTOR_H
#define EXECUTOR_H
+#include "datatype/timestamp.h"
#include "executor/execdesc.h"
#include "fmgr.h"
#include "nodes/lockoptions.h"
TupleTableSlot *outslot);
extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
TupleTableSlot *searchslot, TupleTableSlot *outslot);
-
+extern bool RelationFindDeletedTupleInfoSeq(Relation rel,
+ TupleTableSlot *searchslot,
+ TransactionId oldestxmin,
+ TransactionId *delete_xid,
+ RepOriginId *delete_origin,
+ TimestampTz *delete_time);
+extern bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+ TupleTableSlot *searchslot,
+ TransactionId oldestxmin,
+ TransactionId *delete_xid,
+ RepOriginId *delete_origin,
+ TimestampTz *delete_time);
extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
EState *estate, TupleTableSlot *slot);
extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
/* The updated row value violates unique constraint */
CT_UPDATE_EXISTS,
+ /* The row to be updated was concurrently deleted by a different origin */
+ CT_UPDATE_DELETED,
+
/* The row to be updated is missing */
CT_UPDATE_MISSING,
bool parallel_apply;
/*
- * The changes made by this and later transactions must be retained to
- * ensure reliable conflict detection during the apply phase.
+ * Changes made by this transaction and subsequent ones must be preserved.
+ * This ensures that update_deleted conflicts can be accurately detected
+ * during the apply phase of logical replication by this worker.
*
* The logical replication launcher manages an internal replication slot
* named "pg_conflict_detection". It asynchronously collects this ID to
ss.confl_insert_exists,
ss.confl_update_origin_differs,
ss.confl_update_exists,
+ ss.confl_update_deleted,
ss.confl_update_missing,
ss.confl_delete_origin_differs,
ss.confl_delete_missing,
ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
# Setup a bidirectional logical replication between node_A & node_B
###############################################################################
-# Initialize nodes.
+# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect
+# the conflict when attempting to update a row that was previously modified by
+# a different origin.
# node_A. Increase the log_min_messages setting to DEBUG2 to debug test
# failures. Disable autovacuum to avoid generating xid that could affect the
my $node_A = $node_publisher;
$node_A->append_conf(
'postgresql.conf',
- qq{autovacuum = off
+ qq{track_commit_timestamp = on
+ autovacuum = off
log_min_messages = 'debug2'});
$node_A->restart;
###############################################################################
# Check that dead tuples on node A cannot be cleaned by VACUUM until the
# concurrent transactions on Node B have been applied and flushed on Node A.
+# Also, check that an update_deleted conflict is detected when updating a row
+# that was deleted by a different origin.
###############################################################################
# Insert a record
"SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
);
+my $log_location = -s $node_B->logfile;
+
$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;");
$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
qr/1 are dead but not yet removable/,
'the deleted column is non-removable');
+# Ensure the DELETE is replayed on Node B
+$node_A->wait_for_catchup($subname_BA);
+
+# Check the conflict detected on Node B
+my $logfile = slurp_file($node_B->logfile(), $log_location);
+ok( $logfile =~
+ qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.*
+.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .*
+.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+ 'delete target row was modified in tab');
+
+$log_location = -s $node_A->logfile;
+
$node_A->safe_psql(
'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
$node_B->wait_for_catchup($subname_AB);
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+ qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+ 'update target row was deleted in tab');
+
# Remember the next transaction ID to be assigned
my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
qr/1 removed, 1 remain, 0 are dead but not yet removable/,
'the deleted column is removed');
+###############################################################################
+# Ensure that the deleted tuple needed to detect an update_deleted conflict is
+# accessible via a sequential table scan.
+###############################################################################
+
+# Drop the primary key from tab on node A and set REPLICA IDENTITY to FULL to
+# enforce sequential scanning of the table.
+$node_A->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_B->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_A->safe_psql('postgres', "ALTER TABLE tab DROP CONSTRAINT tab_pkey;");
+
+# Disable the logical replication from node B to node A
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE");
+
+# Wait for the apply worker to stop
+$node_A->poll_query_until('postgres',
+ "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+);
+
+$node_B->safe_psql('postgres', "UPDATE tab SET b = 4 WHERE a = 2;");
+$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 2;");
+
+$log_location = -s $node_A->logfile;
+
+$node_A->safe_psql(
+ 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+$node_B->wait_for_catchup($subname_AB);
+
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+ qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(2, 4\); replica identity full \(2, 2\)/,
+ 'update target row was deleted in tab');
+
###############################################################################
# Check that the replication slot pg_conflict_detection is dropped after
# removing all the subscriptions.