]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Fix re-distributing previously distributed invalidation messages during logical decoding.
authorMasahiko Sawada <msawada@postgresql.org>
Tue, 17 Jun 2025 00:35:58 +0000 (17:35 -0700)
committerMasahiko Sawada <msawada@postgresql.org>
Tue, 17 Jun 2025 00:35:58 +0000 (17:35 -0700)
Commit 4909b38af0 introduced logic to distribute invalidation messages
from catalog-modifying transactions to all concurrent in-progress
transactions. However, since each transaction distributes not only its
original invalidation messages but also previously distributed
messages to other transactions, this leads to an exponential increase
in allocation request size for invalidation messages, ultimately
causing memory allocation failure.

This commit fixes this issue by tracking distributed invalidation
messages separately per decoded transaction and not redistributing
these messages to other in-progress transactions. The maximum size of
distributed invalidation messages that one transaction can store is
limited to MAX_DISTR_INVAL_MSG_PER_TXN (8MB). Once the size of the
distributed invalidation messages exceeds this threshold, we
invalidate all caches in locations where distributed invalidation
messages need to be executed.

Back-patch to all supported versions where we introduced the fix by
commit 4909b38af0.

Note that this commit adds two new fields to ReorderBufferTXN to store
the distributed transactions. This change breaks ABI compatibility in
back branches, affecting third-party extensions that depend on the
size of the ReorderBufferTXN struct, though this scenario seems
unlikely.

Additionally, it adds a new flag to the txn_flags field of
ReorderBufferTXN to indicate distributed invalidation message
overflow. This should not affect existing implementations, as it is
unlikely that third-party extensions use unused bits in the txn_flags
field.

Bug: #18938 #18942
Author: vignesh C <vignesh21@gmail.com>
Reported-by: Duncan Sands <duncan.sands@deepbluecap.com>
Reported-by: John Hutchins <john.hutchins@wicourts.gov>
Reported-by: Laurence Parry <greenreaper@hotmail.com>
Reported-by: Max Madden <maxmmadden@gmail.com>
Reported-by: Braulio Fdo Gonzalez <brauliofg@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Discussion: https://postgr.es/m/680bdaf6-f7d1-4536-b580-05c2760c67c6@deepbluecap.com
Discussion: https://postgr.es/m/18942-0ab1e5ae156613ad@postgresql.org
Discussion: https://postgr.es/m/18938-57c9a1c463b68ce0@postgresql.org
Discussion: https://postgr.es/m/CAD1FGCT2sYrP_70RTuo56QTizyc+J3wJdtn2gtO3VttQFpdMZg@mail.gmail.com
Discussion: https://postgr.es/m/CANO2=B=2BT1hSYCE=nuuTnVTnjidMg0+-FfnRnqM6kd23qoygg@mail.gmail.com
Backpatch-through: 13

contrib/test_decoding/expected/invalidation_distribution.out
contrib/test_decoding/specs/invalidation_distribution.spec
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/snapbuild.c
src/include/replication/reorderbuffer.h

index ad0a944cbf303211abeae126aab4ee5145dff8f1..ae53b1e61de3e83c68a51045fb39dcf9bbb18aef 100644 (file)
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
 starting permutation: s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes
 step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
@@ -18,3 +18,24 @@ count
 stop    
 (1 row)
 
+
+starting permutation: s1_begin s1_insert_tbl1 s3_begin s3_insert_tbl1 s2_alter_pub_add_tbl s1_insert_tbl1 s1_commit s3_commit s2_get_binary_changes
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s3_begin: BEGIN;
+step s3_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (2, 2);
+step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1;
+step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1);
+step s1_commit: COMMIT;
+step s3_commit: COMMIT;
+step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73;
+count
+-----
+    1
+(1 row)
+
+?column?
+--------
+stop    
+(1 row)
+
index decbed627e32731f45056811dea77dc4a8083cce..67d41969ac1d6e3b8dd538ebf3a0f6e5f889a004 100644 (file)
@@ -28,5 +28,16 @@ setup { SET synchronous_commit=on; }
 step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; }
 step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; }
 
+session "s3"
+setup { SET synchronous_commit=on; }
+step "s3_begin" { BEGIN; }
+step "s3_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (2, 2); }
+step "s3_commit" { COMMIT; }
+
 # Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I'
 permutation "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes"
+
+# Expect to get one insert change with LOGICAL_REP_MSG_INSERT = 'I' from
+# the second "s1_insert_tbl1" executed after adding the table tbl1 to the
+# publication in "s2_alter_pub_add_tbl".
+permutation "s1_begin" "s1_insert_tbl1" "s3_begin" "s3_insert_tbl1" "s2_alter_pub_add_tbl" "s1_insert_tbl1" "s1_commit" "s3_commit" "s2_get_binary_changes"
index 03eb005c39d7574be1b049caa3910a116f3fab92..d85a9f2bb2026dbcd01bc8df51bd3e17421d3ad8 100644 (file)
 #include "storage/fd.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
 
+/*
+ * Each transaction has an 8MB limit for invalidation messages distributed from
+ * other transactions. This limit is set considering scenarios with many
+ * concurrent logical decoding operations. When the distributed invalidation
+ * messages reach this threshold, the transaction is marked as
+ * RBTXN_DISTR_INVAL_OVERFLOWED to invalidate the complete cache as we have lost
+ * some inval messages and hence don't know what needs to be invalidated.
+ */
+#define MAX_DISTR_INVAL_MSG_PER_TXN \
+       ((8 * 1024 * 1024) / sizeof(SharedInvalidationMessage))
+
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
 {
@@ -469,6 +481,12 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
                txn->invalidations = NULL;
        }
 
+       if (txn->invalidations_distributed)
+       {
+               pfree(txn->invalidations_distributed);
+               txn->invalidations_distributed = NULL;
+       }
+
        /* Reset the toast hash */
        ReorderBufferToastReset(rb, txn);
 
@@ -2574,7 +2592,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
                AbortCurrentTransaction();
 
                /* make sure there's no cache pollution */
-               ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+               if (rbtxn_distr_inval_overflowed(txn))
+               {
+                       Assert(txn->ninvalidations_distributed == 0);
+                       InvalidateSystemCaches();
+               }
+               else
+               {
+                       ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+                       ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+                                                                                         txn->invalidations_distributed);
+               }
 
                if (using_subtxn)
                        RollbackAndReleaseCurrentSubTransaction();
@@ -2620,8 +2648,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
                AbortCurrentTransaction();
 
                /* make sure there's no cache pollution */
-               ReorderBufferExecuteInvalidations(txn->ninvalidations,
-                                                                                 txn->invalidations);
+               if (rbtxn_distr_inval_overflowed(txn))
+               {
+                       Assert(txn->ninvalidations_distributed == 0);
+                       InvalidateSystemCaches();
+               }
+               else
+               {
+                       ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
+                       ReorderBufferExecuteInvalidations(txn->ninvalidations_distributed,
+                                                                                         txn->invalidations_distributed);
+               }
 
                if (using_subtxn)
                        RollbackAndReleaseCurrentSubTransaction();
@@ -2951,7 +2988,8 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
                 * We might have decoded changes for this transaction that could load
                 * the cache as per the current transaction's view (consider DDL's
                 * happened in this transaction). We don't want the decoding of future
-                * transactions to use those cache entries so execute invalidations.
+                * transactions to use those cache entries so execute only the inval
+                * messages in this transaction.
                 */
                if (txn->ninvalidations > 0)
                        ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3038,9 +3076,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
        txn->final_lsn = lsn;
 
        /*
-        * Process cache invalidation messages if there are any. Even if we're not
-        * interested in the transaction's contents, it could have manipulated the
-        * catalog and we need to update the caches according to that.
+        * Process only cache invalidation messages in this transaction if there
+        * are any. Even if we're not interested in the transaction's contents, it
+        * could have manipulated the catalog and we need to update the caches
+        * according to that.
         */
        if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
                ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
@@ -3312,6 +3351,57 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
        txn->ntuplecids++;
 }
 
+/*
+ * Add new invalidation messages to the reorder buffer queue.
+ */
+static void
+ReorderBufferQueueInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                               XLogRecPtr lsn, Size nmsgs,
+                                                               SharedInvalidationMessage *msgs)
+{
+       ReorderBufferChange *change;
+
+       change = ReorderBufferGetChange(rb);
+       change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
+       change->data.inval.ninvalidations = nmsgs;
+       change->data.inval.invalidations = (SharedInvalidationMessage *)
+               palloc(sizeof(SharedInvalidationMessage) * nmsgs);
+       memcpy(change->data.inval.invalidations, msgs,
+                  sizeof(SharedInvalidationMessage) * nmsgs);
+
+       ReorderBufferQueueChange(rb, xid, lsn, change, false);
+}
+
+/*
+ * A helper function for ReorderBufferAddInvalidations() and
+ * ReorderBufferAddDistributedInvalidations() to accumulate the invalidation
+ * messages to the **invals_out.
+ */
+static void
+ReorderBufferAccumulateInvalidations(SharedInvalidationMessage **invals_out,
+                                                                        uint32 *ninvals_out,
+                                                                        SharedInvalidationMessage *msgs_new,
+                                                                        Size nmsgs_new)
+{
+       if (*ninvals_out == 0)
+       {
+               *ninvals_out = nmsgs_new;
+               *invals_out = (SharedInvalidationMessage *)
+                       palloc(sizeof(SharedInvalidationMessage) * nmsgs_new);
+               memcpy(*invals_out, msgs_new, sizeof(SharedInvalidationMessage) * nmsgs_new);
+       }
+       else
+       {
+               /* Enlarge the array of inval messages */
+               *invals_out = (SharedInvalidationMessage *)
+                       repalloc(*invals_out, sizeof(SharedInvalidationMessage) *
+                                        (*ninvals_out + nmsgs_new));
+               memcpy(*invals_out + *ninvals_out, msgs_new,
+                          nmsgs_new * sizeof(SharedInvalidationMessage));
+               *ninvals_out += nmsgs_new;
+       }
+}
+
 /*
  * Accumulate the invalidations for executing them later.
  *
@@ -3332,7 +3422,6 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 {
        ReorderBufferTXN *txn;
        MemoryContext oldcontext;
-       ReorderBufferChange *change;
 
        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
@@ -3347,35 +3436,76 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 
        Assert(nmsgs > 0);
 
-       /* Accumulate invalidations. */
-       if (txn->ninvalidations == 0)
-       {
-               txn->ninvalidations = nmsgs;
-               txn->invalidations = (SharedInvalidationMessage *)
-                       palloc(sizeof(SharedInvalidationMessage) * nmsgs);
-               memcpy(txn->invalidations, msgs,
-                          sizeof(SharedInvalidationMessage) * nmsgs);
-       }
-       else
+       ReorderBufferAccumulateInvalidations(&txn->invalidations,
+                                                                                &txn->ninvalidations,
+                                                                                msgs, nmsgs);
+
+       ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
+
+       MemoryContextSwitchTo(oldcontext);
+}
+
+/*
+ * Accumulate the invalidations distributed by other committed transactions
+ * for executing them later.
+ *
+ * This function is similar to ReorderBufferAddInvalidations() but stores
+ * the given inval messages to the txn->invalidations_distributed with the
+ * overflow check.
+ *
+ * This needs to be called by committed transactions to distribute their
+ * inval messages to in-progress transactions.
+ */
+void
+ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                                                XLogRecPtr lsn, Size nmsgs,
+                                                                                SharedInvalidationMessage *msgs)
+{
+       ReorderBufferTXN *txn;
+       MemoryContext oldcontext;
+
+       txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+       oldcontext = MemoryContextSwitchTo(rb->context);
+
+       /*
+        * Collect all the invalidations under the top transaction, if available,
+        * so that we can execute them all together.  See comments
+        * ReorderBufferAddInvalidations.
+        */
+       txn = rbtxn_get_toptxn(txn);
+
+       Assert(nmsgs > 0);
+
+       if (!rbtxn_distr_inval_overflowed(txn))
        {
-               txn->invalidations = (SharedInvalidationMessage *)
-                       repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
-                                        (txn->ninvalidations + nmsgs));
+               /*
+                * Check the transaction has enough space for storing distributed
+                * invalidation messages.
+                */
+               if (txn->ninvalidations_distributed + nmsgs >= MAX_DISTR_INVAL_MSG_PER_TXN)
+               {
+                       /*
+                        * Mark the invalidation message as overflowed and free up the
+                        * messages accumulated so far.
+                        */
+                       txn->txn_flags |= RBTXN_DISTR_INVAL_OVERFLOWED;
 
-               memcpy(txn->invalidations + txn->ninvalidations, msgs,
-                          nmsgs * sizeof(SharedInvalidationMessage));
-               txn->ninvalidations += nmsgs;
+                       if (txn->invalidations_distributed)
+                       {
+                               pfree(txn->invalidations_distributed);
+                               txn->invalidations_distributed = NULL;
+                               txn->ninvalidations_distributed = 0;
+                       }
+               }
+               else
+                       ReorderBufferAccumulateInvalidations(&txn->invalidations_distributed,
+                                                                                                &txn->ninvalidations_distributed,
+                                                                                                msgs, nmsgs);
        }
 
-       change = ReorderBufferGetChange(rb);
-       change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
-       change->data.inval.ninvalidations = nmsgs;
-       change->data.inval.invalidations = (SharedInvalidationMessage *)
-               palloc(sizeof(SharedInvalidationMessage) * nmsgs);
-       memcpy(change->data.inval.invalidations, msgs,
-                  sizeof(SharedInvalidationMessage) * nmsgs);
-
-       ReorderBufferQueueChange(rb, xid, lsn, change, false);
+       /* Queue the invalidation messages into the transaction */
+       ReorderBufferQueueInvalidations(rb, xid, lsn, nmsgs, msgs);
 
        MemoryContextSwitchTo(oldcontext);
 }
index 110e0b0a0449e60ee70ea11e4a686603a47544a2..5c3bbf0e93f8c07dae9c0722138f9a2393d8c1f0 100644 (file)
@@ -932,6 +932,13 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
                 * contents built by the current transaction even after its decoding,
                 * which should have been invalidated due to concurrent catalog
                 * changing transaction.
+                *
+                * Distribute only the invalidation messages generated by the current
+                * committed transaction. Invalidation messages received from other
+                * transactions would have already been propagated to the relevant
+                * in-progress transactions. This transaction would have processed
+                * those invalidations, ensuring that subsequent transactions observe
+                * a consistent cache state.
                 */
                if (txn->xid != xid)
                {
@@ -945,8 +952,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
                        {
                                Assert(msgs != NULL);
 
-                               ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
-                                                                                         ninvalidations, msgs);
+                               ReorderBufferAddDistributedInvalidations(builder->reorder,
+                                                                                                                txn->xid, lsn,
+                                                                                                                ninvalidations, msgs);
                        }
                }
        }
index 4c56f219fd8fdd1108bfd9c44111db255c0a7e87..0cfa9005141a1814e5920479debe3fc2b972abf7 100644 (file)
@@ -159,15 +159,16 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES      0x0001
-#define RBTXN_IS_SUBXACT               0x0002
-#define RBTXN_IS_SERIALIZED            0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR      0x0008
-#define RBTXN_IS_STREAMED              0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE       0x0020
-#define RBTXN_PREPARE                  0x0040
-#define RBTXN_SKIPPED_PREPARE          0x0080
-#define RBTXN_HAS_STREAMABLE_CHANGE    0x0100
+#define RBTXN_HAS_CATALOG_CHANGES              0x0001
+#define RBTXN_IS_SUBXACT                       0x0002
+#define RBTXN_IS_SERIALIZED                    0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR              0x0008
+#define RBTXN_IS_STREAMED                      0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE               0x0020
+#define RBTXN_PREPARE                          0x0040
+#define RBTXN_SKIPPED_PREPARE                  0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE            0x0100
+#define RBTXN_DISTR_INVAL_OVERFLOWED   0x0200
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -231,6 +232,12 @@ typedef struct ReorderBufferChange
        ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
 )
 
+/* Is the array of distributed inval messages overflowed? */
+#define rbtxn_distr_inval_overflowed(txn) \
+( \
+       ((txn)->txn_flags & RBTXN_DISTR_INVAL_OVERFLOWED) != 0 \
+)
+
 /* Is this a top-level transaction? */
 #define rbtxn_is_toptxn(txn) \
 ( \
@@ -422,6 +429,12 @@ typedef struct ReorderBufferTXN
         * Private data pointer of the output plugin.
         */
        void       *output_plugin_private;
+
+       /*
+        * Stores cache invalidation messages distributed by other transactions.
+        */
+       uint32          ninvalidations_distributed;
+       SharedInvalidationMessage *invalidations_distributed;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
@@ -709,6 +722,9 @@ extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
                                                                                 CommandId cmin, CommandId cmax, CommandId combocid);
 extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
                                                                                  Size nmsgs, SharedInvalidationMessage *msgs);
+extern void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,
+                                                                                                        XLogRecPtr lsn, Size nmsgs,
+                                                                                                        SharedInvalidationMessage *msgs);
 extern void ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
                                                                                           SharedInvalidationMessage *invalidations);
 extern void ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);