]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Detect and report update_deleted conflicts.
authorAmit Kapila <akapila@postgresql.org>
Mon, 4 Aug 2025 04:02:47 +0000 (04:02 +0000)
committerAmit Kapila <akapila@postgresql.org>
Mon, 4 Aug 2025 04:02:47 +0000 (04:02 +0000)
This enhancement builds upon the infrastructure introduced in commit
228c370868, which enables the preservation of deleted tuples and their
origin information on the subscriber. This capability is crucial for
handling concurrent transactions replicated from remote nodes.

The update introduces support for detecting update_deleted conflicts
during the application of update operations on the subscriber. When an
update operation fails to locate the target row-typically because it has
been concurrently deleted-we perform an additional table scan. This scan
uses the SnapshotAny mechanism and we do this additional scan only when
the retain_dead_tuples option is enabled for the relevant subscription.

The goal of this scan is to locate the most recently deleted tuple-matching
the old column values from the remote update-that has not yet been removed
by VACUUM and is still visible according to our slot (i.e., its deletion
is not older than conflict-detection-slot's xmin). If such a tuple is
found, the system reports an update_deleted conflict, including the origin
and transaction details responsible for the deletion.

This provides a groundwork for more robust and accurate conflict
resolution process, preventing unexpected behavior by correctly
identifying cases where a remote update clashes with a deletion from
another origin.

Author: Zhijie Hou <houzj.fnst@fujitsu.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/OS0PR01MB5716BE80DAEB0EE2A6A5D1F5949D2@OS0PR01MB5716.jpnprd01.prod.outlook.com

16 files changed:
doc/src/sgml/catalogs.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/monitoring.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/system_views.sql
src/backend/executor/execReplication.c
src/backend/replication/logical/conflict.c
src/backend/replication/logical/worker.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/executor/executor.h
src/include/replication/conflict.h
src/include/replication/worker_internal.h
src/test/regress/expected/rules.out
src/test/subscription/t/035_conflicts.pl

index 97f547b3cc4b2f6e578787427ba84a14c072287d..da8a78825809fc06ed0bf6ebb474ce19d6f40935 100644 (file)
@@ -8087,7 +8087,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
        <structfield>subretaindeadtuples</structfield> <type>bool</type>
       </para>
       <para>
-       If true, the information (e.g., dead tuples, commit timestamps, and
+       If true, the detection of <xref linkend="conflict-update-deleted"/> is
+       enabled and the information (e.g., dead tuples, commit timestamps, and
        origins) on the subscriber that is useful for conflict detection is
        retained.
       </para></entry>
index fcac55aefe6653e195f77e3632533d5bb61fe7ea..a0761cfee3f6d637902bda462046b06b24987c49 100644 (file)
@@ -1804,6 +1804,22 @@ Publications:
       </para>
      </listitem>
     </varlistentry>
+    <varlistentry id="conflict-update-deleted" xreflabel="update_deleted">
+     <term><literal>update_deleted</literal></term>
+     <listitem>
+      <para>
+       The tuple to be updated was concurrently deleted by another origin. The
+       update will simply be skipped in this scenario. Note that this conflict
+       can only be detected when
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       and <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+       are enabled. Note that if a tuple cannot be found due to the table being
+       truncated, only a <literal>update_missing</literal> conflict will
+       arise. Additionally, if the tuple was deleted by the same origin, an
+       <literal>update_missing</literal> conflict will arise.
+      </para>
+     </listitem>
+    </varlistentry>
     <varlistentry id="conflict-update-missing" xreflabel="update_missing">
      <term><literal>update_missing</literal></term>
      <listitem>
index 823afe1b30b22d2e0742a44f2322012228293085..fa78031ccbbf08ea9d7654a0d80d70a025d318fb 100644 (file)
@@ -2223,6 +2223,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_deleted</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was concurrently deleted by
+       another source during the application of changes. See <xref linkend="conflict-update-deleted"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>confl_update_missing</structfield> <type>bigint</type>
index b8cd15f32806bfb7c04d196f0b13022e401893a7..247c5bd260410c21264fc49140c5a4ece0106f96 100644 (file)
@@ -445,10 +445,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           Specifies whether the information (e.g., dead tuples, commit
           timestamps, and origins) required for conflict detection on the
           subscriber is retained. The default is <literal>false</literal>.
-          If set to <literal>true</literal>, a physical replication slot named
-          <quote><literal>pg_conflict_detection</literal></quote> will be
-          created on the subscriber to prevent the conflict information from
-          being removed.
+          If set to <literal>true</literal>, the detection of
+          <xref linkend="conflict-update-deleted"/> is enabled, and a physical
+          replication slot named <quote><literal>pg_conflict_detection</literal></quote>
+          created on the subscriber to prevent the information for detecting
+          conflicts from being removed.
          </para>
 
          <para>
index f6eca09ee153aa31a807a8c55ad82efff2292fa5..77c693f630e4b9f6dc2ebc9319bbd450658c98bb 100644 (file)
@@ -1399,6 +1399,7 @@ CREATE VIEW pg_stat_subscription_stats AS
         ss.confl_insert_exists,
         ss.confl_update_origin_differs,
         ss.confl_update_exists,
+        ss.confl_update_deleted,
         ss.confl_update_missing,
         ss.confl_delete_origin_differs,
         ss.confl_delete_missing,
index f262e7a66f771da13df52d234c2ebe2928767236..68184f5d671e2c1affd0119df747ddd1e9f1b5cc 100644 (file)
 
 #include "postgres.h"
 
+#include "access/commit_ts.h"
 #include "access/genam.h"
 #include "access/gist.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/heapam.h"
 #include "catalog/pg_am_d.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
@@ -36,7 +38,7 @@
 
 
 static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
-                                                TypeCacheEntry **eq);
+                                                TypeCacheEntry **eq, Bitmapset *columns);
 
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
@@ -221,7 +223,7 @@ retry:
                        if (eq == NULL)
                                eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 
-                       if (!tuples_equal(outslot, searchslot, eq))
+                       if (!tuples_equal(outslot, searchslot, eq, NULL))
                                continue;
                }
 
@@ -277,10 +279,13 @@ retry:
 
 /*
  * Compare the tuples in the slots by checking if they have equal values.
+ *
+ * If 'columns' is not null, only the columns specified within it will be
+ * considered for the equality check, ignoring all other columns.
  */
 static bool
 tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
-                        TypeCacheEntry **eq)
+                        TypeCacheEntry **eq, Bitmapset *columns)
 {
        int                     attrnum;
 
@@ -305,6 +310,14 @@ tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
                if (att->attisdropped || att->attgenerated)
                        continue;
 
+               /*
+                * Ignore columns that are not listed for checking.
+                */
+               if (columns &&
+                       !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+                                                  columns))
+                       continue;
+
                /*
                 * If one value is NULL and other is not, then they are certainly not
                 * equal
@@ -380,7 +393,7 @@ retry:
        /* Try to find the tuple */
        while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
        {
-               if (!tuples_equal(scanslot, searchslot, eq))
+               if (!tuples_equal(scanslot, searchslot, eq, NULL))
                        continue;
 
                found = true;
@@ -455,6 +468,236 @@ BuildConflictIndexInfo(ResultRelInfo *resultRelInfo, Oid conflictindex)
        }
 }
 
+/*
+ * If the tuple is recently dead and was deleted by a transaction with a newer
+ * commit timestamp than previously recorded, update the associated transaction
+ * ID, commit time, and origin. This helps ensure that conflict detection uses
+ * the most recent and relevant deletion metadata.
+ */
+static void
+update_most_recent_deletion_info(TupleTableSlot *scanslot,
+                                                                TransactionId oldestxmin,
+                                                                TransactionId *delete_xid,
+                                                                TimestampTz *delete_time,
+                                                                RepOriginId *delete_origin)
+{
+       BufferHeapTupleTableSlot *hslot;
+       HeapTuple       tuple;
+       Buffer          buf;
+       bool            recently_dead = false;
+       TransactionId xmax;
+       TimestampTz localts;
+       RepOriginId localorigin;
+
+       hslot = (BufferHeapTupleTableSlot *) scanslot;
+
+       tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
+       buf = hslot->buffer;
+
+       LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+       /*
+        * We do not consider HEAPTUPLE_DEAD status because it indicates either
+        * tuples whose inserting transaction was aborted (meaning there is no
+        * commit timestamp or origin), or tuples deleted by a transaction older
+        * than oldestxmin, making it safe to ignore them during conflict
+        * detection (See comments atop worker.c for details).
+        */
+       if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
+               recently_dead = true;
+
+       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+       if (!recently_dead)
+               return;
+
+       xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
+       if (!TransactionIdIsValid(xmax))
+               return;
+
+       /* Select the dead tuple with the most recent commit timestamp */
+       if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
+               TimestampDifferenceExceeds(*delete_time, localts, 0))
+       {
+               *delete_xid = xmax;
+               *delete_time = localts;
+               *delete_origin = localorigin;
+       }
+}
+
+/*
+ * Searches the relation 'rel' for the most recently deleted tuple that matches
+ * the values in 'searchslot' and is not yet removable by VACUUM. The function
+ * returns the transaction ID, origin, and commit timestamp of the transaction
+ * that deleted this tuple.
+ *
+ * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
+ * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
+ * conflict detection.
+ *
+ * Instead of stopping at the first match, we scan all matching dead tuples to
+ * identify most recent deletion. This is crucial because only the latest
+ * deletion is relevant for resolving conflicts.
+ *
+ * For example, consider a scenario on the subscriber where a row is deleted,
+ * re-inserted, and then deleted again only on the subscriber:
+ *
+ *   - (pk, 1) - deleted at 9:00,
+ *   - (pk, 1) - deleted at 9:02,
+ *
+ * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
+ *
+ * If we mistakenly return the older deletion (9:00), the system may wrongly
+ * apply the remote update using a last-update-wins strategy. Instead, we must
+ * recognize the more recent deletion at 9:02 and skip the update. See
+ * comments atop worker.c for details. Note, as of now, conflict resolution
+ * is not implemented. Consequently, the system may incorrectly report the
+ * older tuple as the conflicted one, leading to misleading results.
+ *
+ * The commit timestamp of the deleting transaction is used to determine which
+ * tuple was deleted most recently.
+ */
+bool
+RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
+                                                               TransactionId oldestxmin,
+                                                               TransactionId *delete_xid,
+                                                               RepOriginId *delete_origin,
+                                                               TimestampTz *delete_time)
+{
+       TupleTableSlot *scanslot;
+       TableScanDesc scan;
+       TypeCacheEntry **eq;
+       Bitmapset  *indexbitmap;
+       TupleDesc       desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+       Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+
+       *delete_xid = InvalidTransactionId;
+       *delete_origin = InvalidRepOriginId;
+       *delete_time = 0;
+
+       /*
+        * If the relation has a replica identity key or a primary key that is
+        * unusable for locating deleted tuples (see
+        * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
+        * necessary. In such cases, comparing the entire tuple is not required,
+        * since the remote tuple might not include all column values. Instead,
+        * the indexed columns alone are suffcient to identify the target tuple
+        * (see logicalrep_rel_mark_updatable).
+        */
+       indexbitmap = RelationGetIndexAttrBitmap(rel,
+                                                                                        INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+       /* fallback to PK if no replica identity */
+       if (!indexbitmap)
+               indexbitmap = RelationGetIndexAttrBitmap(rel,
+                                                                                                INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+       eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
+
+       /*
+        * Start a heap scan using SnapshotAny to identify dead tuples that are
+        * not visible under a standard MVCC snapshot. Tuples from transactions
+        * not yet committed or those just committed prior to the scan are
+        * excluded in update_most_recent_deletion_info().
+        */
+       scan = table_beginscan(rel, SnapshotAny, 0, NULL);
+       scanslot = table_slot_create(rel, NULL);
+
+       table_rescan(scan, NULL);
+
+       /* Try to find the tuple */
+       while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
+       {
+               if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
+                       continue;
+
+               update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
+                                                                                delete_time, delete_origin);
+       }
+
+       table_endscan(scan);
+       ExecDropSingleTupleTableSlot(scanslot);
+
+       return *delete_time != 0;
+}
+
+/*
+ * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
+ * the deleted tuple.
+ */
+bool
+RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+                                                                       TupleTableSlot *searchslot,
+                                                                       TransactionId oldestxmin,
+                                                                       TransactionId *delete_xid,
+                                                                       RepOriginId *delete_origin,
+                                                                       TimestampTz *delete_time)
+{
+       Relation        idxrel;
+       ScanKeyData skey[INDEX_MAX_KEYS];
+       int                     skey_attoff;
+       IndexScanDesc scan;
+       TupleTableSlot *scanslot;
+       TypeCacheEntry **eq = NULL;
+       bool            isIdxSafeToSkipDuplicates;
+       TupleDesc       desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+       Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+       Assert(OidIsValid(idxoid));
+
+       *delete_xid = InvalidTransactionId;
+       *delete_time = 0;
+       *delete_origin = InvalidRepOriginId;
+
+       isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
+
+       scanslot = table_slot_create(rel, NULL);
+
+       idxrel = index_open(idxoid, RowExclusiveLock);
+
+       /* Build scan key. */
+       skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+       /*
+        * Start an index scan using SnapshotAny to identify dead tuples that are
+        * not visible under a standard MVCC snapshot. Tuples from transactions
+        * not yet committed or those just committed prior to the scan are
+        * excluded in update_most_recent_deletion_info().
+        */
+       scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
+
+       index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+       /* Try to find the tuple */
+       while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
+       {
+               /*
+                * Avoid expensive equality check if the index is primary key or
+                * replica identity index.
+                */
+               if (!isIdxSafeToSkipDuplicates)
+               {
+                       if (eq == NULL)
+                               eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
+
+                       if (!tuples_equal(scanslot, searchslot, eq, NULL))
+                               continue;
+               }
+
+               update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
+                                                                                delete_time, delete_origin);
+       }
+
+       index_endscan(scan);
+
+       index_close(idxrel, NoLock);
+
+       ExecDropSingleTupleTableSlot(scanslot);
+
+       return *delete_time != 0;
+}
+
 /*
  * Find the tuple that violates the passed unique index (conflictindex).
  *
index 97c4e26b58654cacef034776dd2bf8c4ab450625..2fd3e8bbda50b07d9a760df5da6bd2dfe40e3f56 100644 (file)
@@ -29,6 +29,7 @@ static const char *const ConflictTypeNames[] = {
        [CT_UPDATE_EXISTS] = "update_exists",
        [CT_UPDATE_MISSING] = "update_missing",
        [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
+       [CT_UPDATE_DELETED] = "update_deleted",
        [CT_DELETE_MISSING] = "delete_missing",
        [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
 };
@@ -176,6 +177,7 @@ errcode_apply_conflict(ConflictType type)
                case CT_UPDATE_ORIGIN_DIFFERS:
                case CT_UPDATE_MISSING:
                case CT_DELETE_ORIGIN_DIFFERS:
+               case CT_UPDATE_DELETED:
                case CT_DELETE_MISSING:
                        return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
        }
@@ -261,6 +263,26 @@ errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo,
 
                        break;
 
+               case CT_UPDATE_DELETED:
+                       if (localts)
+                       {
+                               if (localorigin == InvalidRepOriginId)
+                                       appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
+                                                                        localxmin, timestamptz_to_str(localts));
+                               else if (replorigin_by_oid(localorigin, true, &origin_name))
+                                       appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
+                                                                        origin_name, localxmin, timestamptz_to_str(localts));
+
+                               /* The origin that modified this row has been removed. */
+                               else
+                                       appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
+                                                                        localxmin, timestamptz_to_str(localts));
+                       }
+                       else
+                               appendStringInfo(&err_detail, _("The row to be updated was deleted."));
+
+                       break;
+
                case CT_UPDATE_MISSING:
                        appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
                        break;
index b59221c4d063602556a814322a9af2578bfeb83e..89e241c83928069835940b2b9e61e3be37e05c62 100644 (file)
  * Each apply worker that enabled retain_dead_tuples option maintains a
  * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
  * prevent dead rows from being removed prematurely when the apply worker still
- * needs them to detect conflicts reliably. This helps to retain the required
- * commit_ts module information, which further helps to detect
- * update_origin_differs and delete_origin_differs conflicts reliably, as
+ * needs them to detect update_deleted conflicts. Additionally, this helps to
+ * retain the required commit_ts module information, which further helps to
+ * detect update_origin_differs and delete_origin_differs conflicts reliably, as
  * otherwise, vacuum freeze could remove the required information.
  *
  * The logical replication launcher manages an internal replication slot named
  * transactions that occurred concurrently with the tuple DELETE, any
  * subsequent UPDATE from a remote node should have a later timestamp. In such
  * cases, it is acceptable to detect an update_missing scenario and convert the
- * UPDATE to an INSERT when applying it. But, detecting concurrent remote
- * transactions with earlier timestamps than the DELETE is necessary, as the
- * UPDATEs in remote transactions should be ignored if their timestamp is
- * earlier than that of the dead tuples.
+ * UPDATE to an INSERT when applying it. But, for concurrent remote
+ * transactions with earlier timestamps than the DELETE, detecting
+ * update_deleted is necessary, as the UPDATEs in remote transactions should be
+ * ignored if their timestamp is earlier than that of the dead tuples.
  *
  * Note that advancing the non-removable transaction ID is not supported if the
  * publisher is also a physical standby. This is because the logical walsender
@@ -576,6 +576,12 @@ static bool FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel
                                                                        Oid localidxoid,
                                                                        TupleTableSlot *remoteslot,
                                                                        TupleTableSlot **localslot);
+static bool FindDeletedTupleInLocalRel(Relation localrel,
+                                                                          Oid localidxoid,
+                                                                          TupleTableSlot *remoteslot,
+                                                                          TransactionId *delete_xid,
+                                                                          RepOriginId *delete_origin,
+                                                                          TimestampTz *delete_time);
 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
                                                                           TupleTableSlot *remoteslot,
                                                                           LogicalRepTupleData *newtup,
@@ -2912,17 +2918,31 @@ apply_handle_update_internal(ApplyExecutionData *edata,
        }
        else
        {
+               ConflictType type;
                TupleTableSlot *newslot = localslot;
 
+               /*
+                * Detecting whether the tuple was recently deleted or never existed
+                * is crucial to avoid misleading the user during confict handling.
+                */
+               if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
+                                                                          &conflicttuple.xmin,
+                                                                          &conflicttuple.origin,
+                                                                          &conflicttuple.ts) &&
+                       conflicttuple.origin != replorigin_session_origin)
+                       type = CT_UPDATE_DELETED;
+               else
+                       type = CT_UPDATE_MISSING;
+
                /* Store the new tuple for conflict reporting */
                slot_store_data(newslot, relmapentry, newtup);
 
                /*
-                * The tuple to be updated could not be found.  Do nothing except for
-                * emitting a log message.
+                * The tuple to be updated could not be found or was deleted.  Do
+                * nothing except for emitting a log message.
                 */
-               ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
-                                                       remoteslot, newslot, list_make1(&conflicttuple));
+               ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
+                                                       list_make1(&conflicttuple));
        }
 
        /* Cleanup. */
@@ -3142,6 +3162,112 @@ FindReplTupleInLocalRel(ApplyExecutionData *edata, Relation localrel,
        return found;
 }
 
+/*
+ * Determine whether the index can reliably locate the deleted tuple in the
+ * local relation.
+ *
+ * An index may exclude deleted tuples if it was re-indexed or re-created during
+ * change application. Therefore, an index is considered usable only if the
+ * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
+ * index tuple's xmin. This ensures that any tuples deleted prior to the index
+ * creation or re-indexing are not relevant for conflict detection in the
+ * current apply worker.
+ *
+ * Note that indexes may also be excluded if they were modified by other DDL
+ * operations, such as ALTER INDEX. However, this is acceptable, as the
+ * likelihood of such DDL changes coinciding with the need to scan dead
+ * tuples for the update_deleted is low.
+ */
+static bool
+IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
+                                                                       TransactionId conflict_detection_xmin)
+{
+       HeapTuple       index_tuple;
+       TransactionId index_xmin;
+
+       index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
+
+       if (!HeapTupleIsValid(index_tuple)) /* should not happen */
+               elog(ERROR, "cache lookup failed for index %u", localindexoid);
+
+       /*
+        * No need to check for a frozen transaction ID, as
+        * TransactionIdPrecedes() manages it internally, treating it as falling
+        * behind the conflict_detection_xmin.
+        */
+       index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
+
+       ReleaseSysCache(index_tuple);
+
+       return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
+}
+
+/*
+ * Attempts to locate a deleted tuple in the local relation that matches the
+ * values of the tuple received from the publication side (in 'remoteslot').
+ * The search is performed using either the replica identity index, primary
+ * key, other available index, or a sequential scan if necessary.
+ *
+ * Returns true if the deleted tuple is found. If found, the transaction ID,
+ * origin, and commit timestamp of the deletion are stored in '*delete_xid',
+ * '*delete_origin', and '*delete_time' respectively.
+ */
+static bool
+FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
+                                                  TupleTableSlot *remoteslot,
+                                                  TransactionId *delete_xid, RepOriginId *delete_origin,
+                                                  TimestampTz *delete_time)
+{
+       TransactionId oldestxmin;
+       ReplicationSlot *slot;
+
+       /*
+        * Return false if either dead tuples are not retained or commit timestamp
+        * data is not available.
+        */
+       if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
+               return false;
+
+       /*
+        * For conflict detection, we use the conflict slot's xmin value instead
+        * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
+        * a threshold to identify tuples that were recently deleted. These tuples
+        * are not visible to concurrent transactions, but we log an
+        * update_deleted conflict if such a tuple matches the remote update being
+        * applied.
+        *
+        * Although GetOldestNonRemovableTransactionId() can return a value older
+        * than the slot's xmin, for our current purpose it is acceptable to treat
+        * tuples deleted by transactions prior to slot.xmin as update_missing
+        * conflicts.
+        *
+        * Ideally, we would use oldest_nonremovable_xid, which is directly
+        * maintained by the leader apply worker. However, this value is not
+        * available to table synchronization or parallel apply workers, making
+        * slot.xmin a practical alternative in those contexts.
+        */
+       slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+
+       Assert(slot);
+
+       SpinLockAcquire(&slot->mutex);
+       oldestxmin = slot->data.xmin;
+       SpinLockRelease(&slot->mutex);
+
+       Assert(TransactionIdIsValid(oldestxmin));
+
+       if (OidIsValid(localidxoid) &&
+               IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
+               return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
+                                                                                                  remoteslot, oldestxmin,
+                                                                                                  delete_xid, delete_origin,
+                                                                                                  delete_time);
+       else
+               return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
+                                                                                          oldestxmin, delete_xid,
+                                                                                          delete_origin, delete_time);
+}
+
 /*
  * This handles insert, update, delete on a partitioned table.
  */
@@ -3260,18 +3386,35 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                                                                                                remoteslot_part, &localslot);
                                if (!found)
                                {
+                                       ConflictType type;
                                        TupleTableSlot *newslot = localslot;
 
+                                       /*
+                                        * Detecting whether the tuple was recently deleted or
+                                        * never existed is crucial to avoid misleading the user
+                                        * during confict handling.
+                                        */
+                                       if (FindDeletedTupleInLocalRel(partrel,
+                                                                                                  part_entry->localindexoid,
+                                                                                                  remoteslot_part,
+                                                                                                  &conflicttuple.xmin,
+                                                                                                  &conflicttuple.origin,
+                                                                                                  &conflicttuple.ts) &&
+                                               conflicttuple.origin != replorigin_session_origin)
+                                               type = CT_UPDATE_DELETED;
+                                       else
+                                               type = CT_UPDATE_MISSING;
+
                                        /* Store the new tuple for conflict reporting */
                                        slot_store_data(newslot, part_entry, newtup);
 
                                        /*
-                                        * The tuple to be updated could not be found.  Do nothing
-                                        * except for emitting a log message.
+                                        * The tuple to be updated could not be found or was
+                                        * deleted.  Do nothing except for emitting a log message.
                                         */
                                        ReportApplyConflict(estate, partrelinfo, LOG,
-                                                                               CT_UPDATE_MISSING, remoteslot_part,
-                                                                               newslot, list_make1(&conflicttuple));
+                                                                               type, remoteslot_part, newslot,
+                                                                               list_make1(&conflicttuple));
 
                                        return;
                                }
@@ -4172,8 +4315,8 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
 {
        /*
         * It is sufficient to manage non-removable transaction ID for a
-        * subscription by the main apply worker to detect conflicts reliably even
-        * for table sync or parallel apply workers.
+        * subscription by the main apply worker to detect update_deleted reliably
+        * even for table sync or parallel apply workers.
         */
        if (!am_leader_apply_worker())
                return false;
@@ -4374,10 +4517,11 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
         * We expect the publisher and subscriber clocks to be in sync using time
         * sync service like NTP. Otherwise, we will advance this worker's
         * oldest_nonremovable_xid prematurely, leading to the removal of rows
-        * required to detect conflicts reliably. This check primarily addresses
-        * scenarios where the publisher's clock falls behind; if the publisher's
-        * clock is ahead, subsequent transactions will naturally bear later
-        * commit timestamps, conforming to the design outlined atop worker.c.
+        * required to detect update_deleted reliably. This check primarily
+        * addresses scenarios where the publisher's clock falls behind; if the
+        * publisher's clock is ahead, subsequent transactions will naturally bear
+        * later commit timestamps, conforming to the design outlined atop
+        * worker.c.
         *
         * XXX Consider waiting for the publisher's clock to catch up with the
         * subscriber's before proceeding to the next phase.
index 1c12ddbae493c53dfe0f9520e2d9b35716f534e6..c756c2bebaaa055064fdfb3345312b40c3812ecd 100644 (file)
@@ -2171,7 +2171,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    11
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    12
        Oid                     subid = PG_GETARG_OID(0);
        TupleDesc       tupdesc;
        Datum           values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
@@ -2197,15 +2197,17 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
                                           INT8OID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
+                                          INT8OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
                                           TIMESTAMPTZOID, -1, 0);
        BlessTupleDesc(tupdesc);
 
index 5173d422d468aede01bf6b4cbd6f61c923beef19..750a9d8a09b25978f4d949c886854d6aad7b1c76 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202507231
+#define CATALOG_VERSION_NO     202508041
 
 #endif
index 3ee8fed7e537fad5b4d30b31a1cfc3f6aa11752f..118d6da1ace0e58921a66685bea0839c3e8805b3 100644 (file)
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
index 104b059544dd35582bd97b1e6f8ea3c251da9e07..a71502efeed7519948f3c20be6f7fa2cf1fa8609 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "datatype/timestamp.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
@@ -759,7 +760,18 @@ extern bool RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
                                                                                 TupleTableSlot *outslot);
 extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
                                                                         TupleTableSlot *searchslot, TupleTableSlot *outslot);
-
+extern bool RelationFindDeletedTupleInfoSeq(Relation rel,
+                                                                                       TupleTableSlot *searchslot,
+                                                                                       TransactionId oldestxmin,
+                                                                                       TransactionId *delete_xid,
+                                                                                       RepOriginId *delete_origin,
+                                                                                       TimestampTz *delete_time);
+extern bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+                                                                                               TupleTableSlot *searchslot,
+                                                                                               TransactionId oldestxmin,
+                                                                                               TransactionId *delete_xid,
+                                                                                               RepOriginId *delete_origin,
+                                                                                               TimestampTz *delete_time);
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
                                                                         EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
index 6c59125f25657cad833b5167f2eb832b60ab1e9b..ff3cb8416ecff6d911fa9966eb330ffbfcb66685 100644 (file)
@@ -32,6 +32,9 @@ typedef enum
        /* The updated row value violates unique constraint */
        CT_UPDATE_EXISTS,
 
+       /* The row to be updated was concurrently deleted by a different origin */
+       CT_UPDATE_DELETED,
+
        /* The row to be updated is missing */
        CT_UPDATE_MISSING,
 
index 0c7b8440a61e39b64fa6c83f12e1df7f7c95df83..7c0204dd6f4ce110656a459424981e052dd11928 100644 (file)
@@ -87,8 +87,9 @@ typedef struct LogicalRepWorker
        bool            parallel_apply;
 
        /*
-        * The changes made by this and later transactions must be retained to
-        * ensure reliable conflict detection during the apply phase.
+        * Changes made by this transaction and subsequent ones must be preserved.
+        * This ensures that update_deleted conflicts can be accurately detected
+        * during the apply phase of logical replication by this worker.
         *
         * The logical replication launcher manages an internal replication slot
         * named "pg_conflict_detection". It asynchronously collects this ID to
index dce8c672b40fe19cee324c735897aec9b6bc9499..6509fda77a994ccf849f2aafaf486c56c10af7c6 100644 (file)
@@ -2179,13 +2179,14 @@ pg_stat_subscription_stats| SELECT ss.subid,
     ss.confl_insert_exists,
     ss.confl_update_origin_differs,
     ss.confl_update_exists,
+    ss.confl_update_deleted,
     ss.confl_update_missing,
     ss.confl_delete_origin_differs,
     ss.confl_delete_missing,
     ss.confl_multiple_unique_conflicts,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
index 976d53a870e5e8dc639c335c30e7a1bce94315ac..36aeb14c563af7571a8d7e9c4b783a314e68852c 100644 (file)
@@ -150,7 +150,9 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert');
 # Setup a bidirectional logical replication between node_A & node_B
 ###############################################################################
 
-# Initialize nodes.
+# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect
+# the conflict when attempting to update a row that was previously modified by
+# a different origin.
 
 # node_A. Increase the log_min_messages setting to DEBUG2 to debug test
 # failures. Disable autovacuum to avoid generating xid that could affect the
@@ -158,7 +160,8 @@ pass('multiple_unique_conflicts detected on a leaf partition during insert');
 my $node_A = $node_publisher;
 $node_A->append_conf(
        'postgresql.conf',
-       qq{autovacuum = off
+       qq{track_commit_timestamp = on
+       autovacuum = off
        log_min_messages = 'debug2'});
 $node_A->restart;
 
@@ -270,6 +273,8 @@ $node_A->psql('postgres',
 ###############################################################################
 # Check that dead tuples on node A cannot be cleaned by VACUUM until the
 # concurrent transactions on Node B have been applied and flushed on Node A.
+# Also, check that an update_deleted conflict is detected when updating a row
+# that was deleted by a different origin.
 ###############################################################################
 
 # Insert a record
@@ -288,6 +293,8 @@ $node_A->poll_query_until('postgres',
        "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
 );
 
+my $log_location = -s $node_B->logfile;
+
 $node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;");
 $node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
 
@@ -299,10 +306,30 @@ ok( $stderr =~
          qr/1 are dead but not yet removable/,
        'the deleted column is non-removable');
 
+# Ensure the DELETE is replayed on Node B
+$node_A->wait_for_catchup($subname_BA);
+
+# Check the conflict detected on Node B
+my $logfile = slurp_file($node_B->logfile(), $log_location);
+ok( $logfile =~
+         qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.*
+.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .*
+.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+       'delete target row was modified in tab');
+
+$log_location = -s $node_A->logfile;
+
 $node_A->safe_psql(
        'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
 $node_B->wait_for_catchup($subname_AB);
 
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+         qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+       'update target row was deleted in tab');
+
 # Remember the next transaction ID to be assigned
 my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
 
@@ -324,6 +351,41 @@ ok( $stderr =~
          qr/1 removed, 1 remain, 0 are dead but not yet removable/,
        'the deleted column is removed');
 
+###############################################################################
+# Ensure that the deleted tuple needed to detect an update_deleted conflict is
+# accessible via a sequential table scan.
+###############################################################################
+
+# Drop the primary key from tab on node A and set REPLICA IDENTITY to FULL to
+# enforce sequential scanning of the table.
+$node_A->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_B->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_A->safe_psql('postgres', "ALTER TABLE tab DROP CONSTRAINT tab_pkey;");
+
+# Disable the logical replication from node B to node A
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE");
+
+# Wait for the apply worker to stop
+$node_A->poll_query_until('postgres',
+       "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+);
+
+$node_B->safe_psql('postgres', "UPDATE tab SET b = 4 WHERE a = 2;");
+$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 2;");
+
+$log_location = -s $node_A->logfile;
+
+$node_A->safe_psql(
+       'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+$node_B->wait_for_catchup($subname_AB);
+
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+         qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(2, 4\); replica identity full \(2, 2\)/,
+       'update target row was deleted in tab');
+
 ###############################################################################
 # Check that the replication slot pg_conflict_detection is dropped after
 # removing all the subscriptions.