]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Fix decoding of speculative aborts.
authorAmit Kapila <akapila@postgresql.org>
Tue, 15 Jun 2021 03:48:38 +0000 (09:18 +0530)
committerAmit Kapila <akapila@postgresql.org>
Tue, 15 Jun 2021 03:48:38 +0000 (09:18 +0530)
During decoding for speculative inserts, we were relying for cleaning
toast hash on confirmation records or next change records. But that
could lead to multiple problems (a) memory leak if there is neither a
confirmation record nor any other record after toast insertion for a
speculative insert in the transaction, (b) error and assertion failures
if the next operation is not an insert/update on the same table.

The fix is to start queuing spec abort change and clean up toast hash
and change record during its processing. Currently, we are queuing the
spec aborts for both toast and main table even though we perform cleanup
while processing the main table's spec abort record. Later, if we have a
way to distinguish between the spec abort record of toast and the main
table, we can avoid queuing the change for spec aborts of toast tables.

Reported-by: Ashutosh Bapat
Author: Dilip Kumar
Reviewed-by: Amit Kapila
Backpatch-through: 9.6, where it was introduced
Discussion: https://postgr.es/m/CAExHW5sPKF-Oovx_qZe4p5oM6Dvof7_P+XgsNAViug15Fm99jA@mail.gmail.com

src/backend/replication/logical/decode.c
src/backend/replication/logical/reorderbuffer.c
src/include/replication/reorderbuffer.h

index 130090203708f1bfbc9a1e7c87898ffbaf929a91..571a9019f3ff82a279dda4babe6f56a955525459 100644 (file)
@@ -778,19 +778,17 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
        if (target_node.dbNode != ctx->slot->data.database)
                return;
 
-       /*
-        * Super deletions are irrelevant for logical decoding, it's driven by the
-        * confirmation records.
-        */
-       if (xlrec->flags & XLH_DELETE_IS_SUPER)
-               return;
-
        /* output plugin doesn't look for this origin, no need to queue */
        if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
                return;
 
        change = ReorderBufferGetChange(ctx->reorder);
-       change->action = REORDER_BUFFER_CHANGE_DELETE;
+
+       if (xlrec->flags & XLH_DELETE_IS_SUPER)
+               change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
+       else
+               change->action = REORDER_BUFFER_CHANGE_DELETE;
+
        change->origin_id = XLogRecGetOrigin(r);
 
        memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
index f0de337fe92ae332f6c64ade61959780719d1262..1cd0bbd07d84e1f47aabcc80b47c2d469482fa2c 100644 (file)
@@ -364,6 +364,9 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
                txn->invalidations = NULL;
        }
 
+       /* Reset the toast hash */
+       ReorderBufferToastReset(rb, txn);
+
        /* check whether to put into the slab cache */
        if (rb->nr_cached_transactions < max_cached_transactions)
        {
@@ -449,6 +452,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
                        break;
                        /* no data in addition to the struct itself */
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+               case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                        break;
@@ -1674,8 +1678,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                        change_done:
 
                                        /*
-                                        * Either speculative insertion was confirmed, or it was
-                                        * unsuccessful and the record isn't needed anymore.
+                                        * If speculative insertion was confirmed, the record isn't
+                                        * needed anymore.
                                         */
                                        if (specinsert != NULL)
                                        {
@@ -1717,6 +1721,32 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                                        specinsert = change;
                                        break;
 
+                               case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
+
+                                       /*
+                                        * Abort for speculative insertion arrived. So cleanup the
+                                        * specinsert tuple and toast hash.
+                                        *
+                                        * Note that we get the spec abort change for each toast
+                                        * entry but we need to perform the cleanup only the first
+                                        * time we get it for the main table.
+                                        */
+                                       if (specinsert != NULL)
+                                       {
+                                               /*
+                                                * We must clean the toast hash before processing a
+                                                * completely new tuple to avoid confusion about the
+                                                * previous tuple's toast chunks.
+                                                */
+                                               Assert(change->data.tp.clear_toast_afterwards);
+                                               ReorderBufferToastReset(rb, txn);
+
+                                               /* We don't need this record anymore. */
+                                               ReorderBufferReturnChange(rb, specinsert);
+                                               specinsert = NULL;
+                                       }
+                                       break;
+
                                case REORDER_BUFFER_CHANGE_MESSAGE:
                                        rb->message(rb, txn, change->lsn, true,
                                                                change->data.msg.prefix,
@@ -1792,16 +1822,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                        }
                }
 
-               /*
-                * There's a speculative insertion remaining, just clean in up, it
-                * can't have been successful, otherwise we'd gotten a confirmation
-                * record.
-                */
-               if (specinsert)
-               {
-                       ReorderBufferReturnChange(rb, specinsert);
-                       specinsert = NULL;
-               }
+               /* speculative insertion record must be freed by now */
+               Assert(!specinsert);
 
                /* clean up the iterator */
                ReorderBufferIterTXNFinish(rb, iterstate);
@@ -2476,6 +2498,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                break;
                        }
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+               case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                        /* ReorderBufferChange contains everything important */
@@ -2764,6 +2787,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                        }
                        /* the base struct contains all the data, easy peasy */
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
+               case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
                case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
                case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
                        break;
index e085088f691efdda1439c9b3cb52f02bd7e4b990..ddba4bd133f131619818e848a45468ad7baf7592 100644 (file)
@@ -44,10 +44,10 @@ typedef struct ReorderBufferTupleBuf
  * changes. Users of the decoding facilities will never see changes with
  * *_INTERNAL_* actions.
  *
- * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM changes concern
- * "speculative insertions", and their confirmation respectively.  They're
- * used by INSERT .. ON CONFLICT .. UPDATE.  Users of logical decoding don't
- * have to care about these.
+ * The INTERNAL_SPEC_INSERT and INTERNAL_SPEC_CONFIRM, and INTERNAL_SPEC_ABORT
+ * changes concern "speculative insertions", their confirmation, and abort
+ * respectively.  They're used by INSERT .. ON CONFLICT .. UPDATE.  Users of
+ * logical decoding don't have to care about these.
  */
 enum ReorderBufferChangeType
 {
@@ -59,7 +59,8 @@ enum ReorderBufferChangeType
        REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
        REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
        REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
-       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM
+       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
+       REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT
 };
 
 /*