]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Back-patch fixes for problems with VACUUM destroying t_ctid chains too soon,
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Aug 2005 19:45:06 +0000 (19:45 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Aug 2005 19:45:06 +0000 (19:45 +0000)
and with insufficient paranoia in code that follows t_ctid links.
This patch covers the 8.0 branch.

src/backend/access/heap/heapam.c
src/backend/commands/async.c
src/backend/commands/trigger.c
src/backend/commands/vacuum.c
src/backend/executor/execMain.c
src/backend/utils/time/tqual.c
src/include/access/heapam.h
src/include/executor/executor.h

index b657679fdded63028da1a451debde26ff66285a2..075e4f542056b1cb0c21bdc5168f40fc4e8a6f17 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.182 2004/12/31 21:59:16 pgsql Exp $
+ *       $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.182.4.1 2005/08/25 19:44:52 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -1015,83 +1015,130 @@ heap_release_fetch(Relation relation,
 
 /*
  *     heap_get_latest_tid -  get the latest tid of a specified tuple
+ *
+ * Actually, this gets the latest version that is visible according to
+ * the passed snapshot.  You can pass SnapshotDirty to get the very latest,
+ * possibly uncommitted version.
+ *
+ * *tid is both an input and an output parameter: it is updated to
+ * show the latest version of the row.  Note that it will not be changed
+ * if no version of the row passes the snapshot test.
  */
-ItemPointer
+void
 heap_get_latest_tid(Relation relation,
                                        Snapshot snapshot,
                                        ItemPointer tid)
 {
-       ItemId          lp = NULL;
-       Buffer          buffer;
-       PageHeader      dp;
-       OffsetNumber offnum;
-       HeapTupleData tp;
-       HeapTupleHeader t_data;
+       BlockNumber     blk;
        ItemPointerData ctid;
-       bool            invalidBlock,
-                               linkend,
-                               valid;
+       TransactionId priorXmax;
 
-       /*
-        * get the buffer from the relation descriptor Note that this does a
-        * buffer pin.
-        */
-       buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
-       LockBuffer(buffer, BUFFER_LOCK_SHARE);
+       /* this is to avoid Assert failures on bad input */
+       if (!ItemPointerIsValid(tid))
+               return;
 
        /*
-        * get the item line pointer corresponding to the requested tid
+        * Since this can be called with user-supplied TID, don't trust the
+        * input too much.  (RelationGetNumberOfBlocks is an expensive check,
+        * so we don't check t_ctid links again this way.  Note that it would
+        * not do to call it just once and save the result, either.)
         */
-       dp = (PageHeader) BufferGetPage(buffer);
-       offnum = ItemPointerGetOffsetNumber(tid);
-       invalidBlock = true;
-       if (!PageIsNew(dp))
-       {
-               lp = PageGetItemId(dp, offnum);
-               if (ItemIdIsUsed(lp))
-                       invalidBlock = false;
-       }
-       if (invalidBlock)
-       {
-               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-               ReleaseBuffer(buffer);
-               return NULL;
-       }
+       blk = ItemPointerGetBlockNumber(tid);
+       if (blk >= RelationGetNumberOfBlocks(relation))
+               elog(ERROR, "block number %u is out of range for relation \"%s\"",
+                        blk, RelationGetRelationName(relation));
 
        /*
-        * more sanity checks
+        * Loop to chase down t_ctid links.  At top of loop, ctid is the
+        * tuple we need to examine, and *tid is the TID we will return if
+        * ctid turns out to be bogus.
+        *
+        * Note that we will loop until we reach the end of the t_ctid chain.
+        * Depending on the snapshot passed, there might be at most one visible
+        * version of the row, but we don't try to optimize for that.
         */
+       ctid = *tid;
+       priorXmax = InvalidTransactionId;       /* cannot check first XMIN */
+       for (;;)
+       {
+               Buffer          buffer;
+               PageHeader      dp;
+               OffsetNumber offnum;
+               ItemId          lp;
+               HeapTupleData tp;
+               bool            valid;
 
-       tp.t_datamcxt = NULL;
-       t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
-       tp.t_len = ItemIdGetLength(lp);
-       tp.t_self = *tid;
-       ctid = tp.t_data->t_ctid;
+               /*
+                * Read, pin, and lock the page.
+                */
+               buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
+               LockBuffer(buffer, BUFFER_LOCK_SHARE);
+               dp = (PageHeader) BufferGetPage(buffer);
 
-       /*
-        * check time qualification of tid
-        */
+               /*
+                * Check for bogus item number.  This is not treated as an error
+                * condition because it can happen while following a t_ctid link.
+                * We just assume that the prior tid is OK and return it unchanged.
+                */
+               offnum = ItemPointerGetOffsetNumber(&ctid);
+               if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
+               lp = PageGetItemId(dp, offnum);
+               if (!ItemIdIsUsed(lp))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
 
-       HeapTupleSatisfies(&tp, relation, buffer, dp,
-                                          snapshot, 0, NULL, valid);
+               /* OK to access the tuple */
+               tp.t_self = ctid;
+               tp.t_datamcxt = NULL;
+               tp.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
+               tp.t_len = ItemIdGetLength(lp);
 
-       linkend = true;
-       if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
-               !ItemPointerEquals(tid, &ctid))
-               linkend = false;
+               /*
+                * After following a t_ctid link, we might arrive at an unrelated
+                * tuple.  Check for XMIN match.
+                */
+               if (TransactionIdIsValid(priorXmax) &&
+                       !TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(tp.t_data)))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
 
-       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-       ReleaseBuffer(buffer);
+               /*
+                * Check time qualification of tuple; if visible, set it as the new
+                * result candidate.
+                */
+               HeapTupleSatisfies(&tp, relation, buffer, dp,
+                                                  snapshot, 0, NULL, valid);
+               if (valid)
+                       *tid = ctid;
 
-       if (!valid)
-       {
-               if (linkend)
-                       return NULL;
-               heap_get_latest_tid(relation, snapshot, &ctid);
-               *tid = ctid;
-       }
+               /*
+                * If there's a valid t_ctid link, follow it, else we're done.
+                */
+               if ((tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
+                                                                         HEAP_MARKED_FOR_UPDATE)) ||
+                       ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
+               {
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buffer);
+                       break;
+               }
 
-       return tid;
+               ctid = tp.t_data->t_ctid;
+               priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               ReleaseBuffer(buffer);
+       }                               /* end of loop */
 }
 
 /*
@@ -1250,29 +1297,34 @@ simple_heap_insert(Relation relation, HeapTuple tup)
 }
 
 /*
- *     heap_delete             - delete a tuple
+ *     heap_delete - delete a tuple
  *
  * NB: do not call this directly unless you are prepared to deal with
  * concurrent-update conditions.  Use simple_heap_delete instead.
  *
- *     relation - table to be modified
+ *     relation - table to be modified (caller must hold suitable lock)
  *     tid - TID of tuple to be deleted
  *     ctid - output parameter, used only for failure case (see below)
- *     cid - delete command ID to use in verifying tuple visibility
+ *     update_xmax - output parameter, used only for failure case (see below)
+ *     cid - delete command ID (used for visibility test, and stored into
+ *             cmax if successful)
  *     crosscheck - if not InvalidSnapshot, also check tuple against this
  *     wait - true if should wait for any conflicting update to commit/abort
  *
  * Normal, successful return value is HeapTupleMayBeUpdated, which
  * actually means we did delete it.  Failure return codes are
  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
- * (the last only possible if wait == false).  On a failure return,
- * *ctid is set to the ctid link of the target tuple (possibly a later
- * version of the row).
+ * (the last only possible if wait == false).
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as tid, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
  */
 int
 heap_delete(Relation relation, ItemPointer tid,
-                       ItemPointer ctid, CommandId cid,
-                       Snapshot crosscheck, bool wait)
+                       ItemPointer ctid, TransactionId *update_xmax,
+                       CommandId cid, Snapshot crosscheck, bool wait)
 {
        TransactionId xid = GetCurrentTransactionId();
        ItemId          lp;
@@ -1288,11 +1340,11 @@ heap_delete(Relation relation, ItemPointer tid,
 
        dp = (PageHeader) BufferGetPage(buffer);
        lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
+
        tp.t_datamcxt = NULL;
-       tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
+       tp.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
        tp.t_len = ItemIdGetLength(lp);
        tp.t_self = *tid;
-       tp.t_tableOid = relation->rd_id;
 
 l1:
        result = HeapTupleSatisfiesUpdate(tp.t_data, cid, buffer);
@@ -1346,7 +1398,9 @@ l1:
                Assert(result == HeapTupleSelfUpdated ||
                           result == HeapTupleUpdated ||
                           result == HeapTupleBeingUpdated);
+               Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
                *ctid = tp.t_data->t_ctid;
+               *update_xmax = HeapTupleHeaderGetXmax(tp.t_data);
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                ReleaseBuffer(buffer);
                return result;
@@ -1433,11 +1487,12 @@ l1:
 void
 simple_heap_delete(Relation relation, ItemPointer tid)
 {
-       ItemPointerData ctid;
        int                     result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
 
        result = heap_delete(relation, tid,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 GetCurrentCommandId(), InvalidSnapshot,
                                                 true /* wait for commit */ );
        switch (result)
@@ -1467,27 +1522,33 @@ simple_heap_delete(Relation relation, ItemPointer tid)
  * NB: do not call this directly unless you are prepared to deal with
  * concurrent-update conditions.  Use simple_heap_update instead.
  *
- *     relation - table to be modified
+ *     relation - table to be modified (caller must hold suitable lock)
  *     otid - TID of old tuple to be replaced
  *     newtup - newly constructed tuple data to store
  *     ctid - output parameter, used only for failure case (see below)
- *     cid - update command ID to use in verifying old tuple visibility
+ *     update_xmax - output parameter, used only for failure case (see below)
+ *     cid - update command ID (used for visibility test, and stored into
+ *             cmax/cmin if successful)
  *     crosscheck - if not InvalidSnapshot, also check old tuple against this
  *     wait - true if should wait for any conflicting update to commit/abort
  *
  * Normal, successful return value is HeapTupleMayBeUpdated, which
  * actually means we *did* update it.  Failure return codes are
  * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
- * (the last only possible if wait == false).  On a failure return,
- * *ctid is set to the ctid link of the old tuple (possibly a later
- * version of the row).
+ * (the last only possible if wait == false).
+ *
  * On success, newtup->t_self is set to the TID where the new tuple
  * was inserted.
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as otid, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
  */
 int
 heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
-                       ItemPointer ctid, CommandId cid,
-                       Snapshot crosscheck, bool wait)
+                       ItemPointer ctid, TransactionId *update_xmax,
+                       CommandId cid, Snapshot crosscheck, bool wait)
 {
        TransactionId xid = GetCurrentTransactionId();
        ItemId          lp;
@@ -1573,7 +1634,9 @@ l2:
                Assert(result == HeapTupleSelfUpdated ||
                           result == HeapTupleUpdated ||
                           result == HeapTupleBeingUpdated);
+               Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
                *ctid = oldtup.t_data->t_ctid;
+               *update_xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
                LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
                ReleaseBuffer(buffer);
                return result;
@@ -1795,11 +1858,12 @@ l2:
 void
 simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 {
-       ItemPointerData ctid;
        int                     result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
 
        result = heap_update(relation, otid, tup,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 GetCurrentCommandId(), InvalidSnapshot,
                                                 true /* wait for commit */ );
        switch (result)
@@ -1825,9 +1889,34 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
 
 /*
  *     heap_mark4update                - mark a tuple for update
+ *
+ * Note that this acquires a buffer pin, which the caller must release.
+ *
+ * Input parameters:
+ *     relation: relation containing tuple (caller must hold suitable lock)
+ *     tuple->t_self: TID of tuple to lock (rest of struct need not be valid)
+ *     cid: current command ID (used for visibility test, and stored into
+ *             tuple's cmax if lock is successful)
+ *
+ * Output parameters:
+ *     *tuple: all fields filled in
+ *     *buffer: set to buffer holding tuple (pinned but not locked at exit)
+ *     *ctid: set to tuple's t_ctid, but only in failure cases
+ *     *update_xmax: set to tuple's xmax, but only in failure cases
+ *
+ * Function result may be:
+ *     HeapTupleMayBeUpdated: lock was successfully acquired
+ *     HeapTupleSelfUpdated: lock failed because tuple updated by self
+ *     HeapTupleUpdated: lock failed because tuple updated by other xact
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as t_self, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
  */
 int
 heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer,
+                                ItemPointer ctid, TransactionId *update_xmax,
                                 CommandId cid)
 {
        TransactionId xid = GetCurrentTransactionId();
@@ -1841,9 +1930,12 @@ heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer,
 
        dp = (PageHeader) BufferGetPage(*buffer);
        lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
+       Assert(ItemIdIsUsed(lp));
+
        tuple->t_datamcxt = NULL;
        tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
        tuple->t_len = ItemIdGetLength(lp);
+       tuple->t_tableOid = RelationGetRelid(relation);
 
 l3:
        result = HeapTupleSatisfiesUpdate(tuple->t_data, cid, *buffer);
@@ -1887,7 +1979,9 @@ l3:
        if (result != HeapTupleMayBeUpdated)
        {
                Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
-               tuple->t_self = tuple->t_data->t_ctid;
+               Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
+               *ctid = tuple->t_data->t_ctid;
+               *update_xmax = HeapTupleHeaderGetXmax(tuple->t_data);
                LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
                return result;
        }
index e93d9b711636e58b4d60b549b5b4620a4eaeeb9f..7981dbeb80595de3d9a61656dc5e4720704eb967 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.118 2004/12/31 21:59:41 pgsql Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/async.c,v 1.118.4.1 2005/08/25 19:44:56 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -520,8 +520,9 @@ AtCommit_Notify(void)
                        }
                        else if (listener->notification == 0)
                        {
-                               ItemPointerData ctid;
                                int                     result;
+                               ItemPointerData update_ctid;
+                               TransactionId update_xmax;
 
                                rTuple = heap_modifytuple(lTuple, lRel,
                                                                                  value, nulls, repl);
@@ -543,7 +544,7 @@ AtCommit_Notify(void)
                                 * heap_update calls.
                                 */
                                result = heap_update(lRel, &lTuple->t_self, rTuple,
-                                                                        &ctid,
+                                                                        &update_ctid, &update_xmax,
                                                                         GetCurrentCommandId(), InvalidSnapshot,
                                                                         false /* no wait for commit */ );
                                switch (result)
index 41e3b74c471ae5ab3fceae4a3aef8bcb4e3dcda6..ec998aa7b9f76ab6679b9de85f7bc79af9f073e4 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.177.4.1 2005/04/11 19:51:31 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.177.4.2 2005/08/25 19:44:57 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1567,14 +1567,18 @@ GetTupleForTrigger(EState *estate, ResultRelInfo *relinfo,
        if (newSlot != NULL)
        {
                int                     test;
+               ItemPointerData update_ctid;
+               TransactionId update_xmax;
+
+               *newSlot = NULL;
 
                /*
                 * mark tuple for update
                 */
-               *newSlot = NULL;
-               tuple.t_self = *tid;
 ltrmark:;
-               test = heap_mark4update(relation, &tuple, &buffer, cid);
+               tuple.t_self = *tid;
+               test = heap_mark4update(relation, &tuple, &buffer,
+                                                               &update_ctid, &update_xmax, cid);
                switch (test)
                {
                        case HeapTupleSelfUpdated:
@@ -1591,15 +1595,18 @@ ltrmark:;
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                         errmsg("could not serialize access due to concurrent update")));
-                               else if (!(ItemPointerEquals(&(tuple.t_self), tid)))
+                               else if (!ItemPointerEquals(&update_ctid, &tuple.t_self))
                                {
-                                       TupleTableSlot *epqslot = EvalPlanQual(estate,
-                                                                                        relinfo->ri_RangeTableIndex,
-                                                                                                               &(tuple.t_self));
-
-                                       if (!(TupIsNull(epqslot)))
+                                       /* it was updated, so look at the updated version */
+                                       TupleTableSlot *epqslot;
+
+                                       epqslot = EvalPlanQual(estate,
+                                                                                  relinfo->ri_RangeTableIndex,
+                                                                                  &update_ctid,
+                                                                                  update_xmax);
+                                       if (!TupIsNull(epqslot))
                                        {
-                                               *tid = tuple.t_self;
+                                               *tid = update_ctid;
                                                *newSlot = epqslot;
                                                goto ltrmark;
                                        }
@@ -1634,6 +1641,7 @@ ltrmark:;
                tuple.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
                tuple.t_len = ItemIdGetLength(lp);
                tuple.t_self = *tid;
+               tuple.t_tableOid = RelationGetRelid(relation);
        }
 
        result = heap_copytuple(&tuple);
index cdfc02328d2a94fca3c9a8fcba96c5f04e84b99a..b5ece5729f5b3707a494bf98174449864c79a827 100644 (file)
@@ -13,7 +13,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.299 2004/12/31 21:59:42 pgsql Exp $
+ *       $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.299.4.1 2005/08/25 19:44:58 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1817,72 +1817,85 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        break;          /* out of walk-along-page loop */
                                }
 
-                               vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
-                               num_vtmove = 0;
-                               free_vtmove = 100;
-
                                /*
                                 * If this tuple is in the begin/middle of the chain then
-                                * we have to move to the end of chain.
+                                * we have to move to the end of chain.  As with any
+                                * t_ctid chase, we have to verify that each new tuple
+                                * is really the descendant of the tuple we came from.
                                 */
                                while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
                                                                                          HEAP_MARKED_FOR_UPDATE)) &&
                                           !(ItemPointerEquals(&(tp.t_self),
                                                                                   &(tp.t_data->t_ctid))))
                                {
-                                       Page            Cpage;
-                                       ItemId          Citemid;
-                                       ItemPointerData Ctid;
-
-                                       Ctid = tp.t_data->t_ctid;
-                                       if (freeCbuf)
-                                               ReleaseBuffer(Cbuf);
-                                       freeCbuf = true;
-                                       Cbuf = ReadBuffer(onerel,
-                                                                         ItemPointerGetBlockNumber(&Ctid));
-                                       Cpage = BufferGetPage(Cbuf);
-                                       Citemid = PageGetItemId(Cpage,
-                                                                         ItemPointerGetOffsetNumber(&Ctid));
-                                       if (!ItemIdIsUsed(Citemid))
+                                       ItemPointerData nextTid;
+                                       TransactionId priorXmax;
+                                       Buffer          nextBuf;
+                                       Page            nextPage;
+                                       OffsetNumber nextOffnum;
+                                       ItemId          nextItemid;
+                                       HeapTupleHeader nextTdata;
+
+                                       nextTid = tp.t_data->t_ctid;
+                                       priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
+                                       /* assume block# is OK (see heap_fetch comments) */
+                                       nextBuf = ReadBuffer(onerel,
+                                                                                ItemPointerGetBlockNumber(&nextTid));
+                                       nextPage = BufferGetPage(nextBuf);
+                                       /* If bogus or unused slot, assume tp is end of chain */
+                                       nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
+                                       if (nextOffnum < FirstOffsetNumber ||
+                                               nextOffnum > PageGetMaxOffsetNumber(nextPage))
                                        {
-                                               /*
-                                                * This means that in the middle of chain there
-                                                * was tuple updated by older (than OldestXmin)
-                                                * xaction and this tuple is already deleted by
-                                                * me. Actually, upper part of chain should be
-                                                * removed and seems that this should be handled
-                                                * in scan_heap(), but it's not implemented at the
-                                                * moment and so we just stop shrinking here.
-                                                */
-                                               elog(DEBUG2, "child itemid in update-chain marked as unused --- can't continue repair_frag");
-                                               chain_move_failed = true;
-                                               break;  /* out of loop to move to chain end */
+                                               ReleaseBuffer(nextBuf);
+                                               break;
                                        }
+                                       nextItemid = PageGetItemId(nextPage, nextOffnum);
+                                       if (!ItemIdIsUsed(nextItemid))
+                                       {
+                                               ReleaseBuffer(nextBuf);
+                                               break;
+                                       }
+                                       /* if not matching XMIN, assume tp is end of chain */
+                                       nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
+                                                                                                                         nextItemid);
+                                       if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
+                                                                                        priorXmax))
+                                       {
+                                               ReleaseBuffer(nextBuf);
+                                               break;
+                                       }
+                                       /* OK, switch our attention to the next tuple in chain */
                                        tp.t_datamcxt = NULL;
-                                       tp.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
-                                       tp.t_self = Ctid;
-                                       tlen = tp.t_len = ItemIdGetLength(Citemid);
-                               }
-                               if (chain_move_failed)
-                               {
+                                       tp.t_data = nextTdata;
+                                       tp.t_self = nextTid;
+                                       tlen = tp.t_len = ItemIdGetLength(nextItemid);
                                        if (freeCbuf)
                                                ReleaseBuffer(Cbuf);
-                                       pfree(vtmove);
-                                       break;          /* out of walk-along-page loop */
+                                       Cbuf = nextBuf;
+                                       freeCbuf = true;
                                }
 
+                               /* Set up workspace for planning the chain move */
+                               vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
+                               num_vtmove = 0;
+                               free_vtmove = 100;
+
                                /*
-                                * Check if all items in chain can be moved
+                                * Now, walk backwards up the chain (towards older tuples)
+                                * and check if all items in chain can be moved.  We record
+                                * all the moves that need to be made in the vtmove array.
                                 */
                                for (;;)
                                {
                                        Buffer          Pbuf;
                                        Page            Ppage;
                                        ItemId          Pitemid;
-                                       HeapTupleData Ptp;
+                                       HeapTupleHeader PTdata;
                                        VTupleLinkData vtld,
                                                           *vtlp;
 
+                                       /* Identify a target page to move this tuple to */
                                        if (to_vacpage == NULL ||
                                                !enough_space(to_vacpage, tlen))
                                        {
@@ -1952,18 +1965,17 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                        /* this can't happen since we saw tuple earlier: */
                                        if (!ItemIdIsUsed(Pitemid))
                                                elog(ERROR, "parent itemid marked as unused");
-                                       Ptp.t_datamcxt = NULL;
-                                       Ptp.t_data = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
+                                       PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
 
                                        /* ctid should not have changed since we saved it */
                                        Assert(ItemPointerEquals(&(vtld.new_tid),
-                                                                                        &(Ptp.t_data->t_ctid)));
+                                                                                        &(PTdata->t_ctid)));
 
                                        /*
-                                        * Read above about cases when !ItemIdIsUsed(Citemid)
+                                        * Read above about cases when !ItemIdIsUsed(nextItemid)
                                         * (child item is removed)... Due to the fact that at
                                         * the moment we don't remove unuseful part of
-                                        * update-chain, it's possible to get too old parent
+                                        * update-chain, it's possible to get non-matching parent
                                         * row here. Like as in the case which caused this
                                         * problem, we stop shrinking here. I could try to
                                         * find real parent row but want not to do it because
@@ -1971,7 +1983,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                         * and we are too close to 6.5 release. - vadim
                                         * 06/11/99
                                         */
-                                       if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(Ptp.t_data),
+                                       if (!(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
                                                                         HeapTupleHeaderGetXmin(tp.t_data))))
                                        {
                                                ReleaseBuffer(Pbuf);
@@ -1979,8 +1991,8 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                chain_move_failed = true;
                                                break;  /* out of check-all-items loop */
                                        }
-                                       tp.t_datamcxt = Ptp.t_datamcxt;
-                                       tp.t_data = Ptp.t_data;
+                                       tp.t_datamcxt = NULL;
+                                       tp.t_data = PTdata;
                                        tlen = tp.t_len = ItemIdGetLength(Pitemid);
                                        if (freeCbuf)
                                                ReleaseBuffer(Cbuf);
@@ -2499,16 +2511,27 @@ move_chain_tuple(Relation rel,
        newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
                                                 InvalidOffsetNumber, LP_USED);
        if (newoff == InvalidOffsetNumber)
-       {
                elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
                         (unsigned long) tuple_len, dst_vacpage->blkno);
-       }
        newitemid = PageGetItemId(dst_page, newoff);
+       /* drop temporary copy, and point to the version on the dest page */
        pfree(newtup.t_data);
        newtup.t_datamcxt = NULL;
        newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
+
        ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);
 
+       /*
+        * Set new tuple's t_ctid pointing to itself if last tuple in chain,
+        * and to next tuple in chain otherwise.  (Since we move the chain
+        * in reverse order, this is actually the previously processed tuple.)
+        */
+       if (!ItemPointerIsValid(ctid))
+               newtup.t_data->t_ctid = newtup.t_self;
+       else
+               newtup.t_data->t_ctid = *ctid;
+       *ctid = newtup.t_self;
+
        /* XLOG stuff */
        if (!rel->rd_istemp)
        {
@@ -2533,17 +2556,6 @@ move_chain_tuple(Relation rel,
 
        END_CRIT_SECTION();
 
-       /*
-        * Set new tuple's t_ctid pointing to itself for last tuple in chain,
-        * and to next tuple in chain otherwise.
-        */
-       /* Is this ok after log_heap_move() and END_CRIT_SECTION()? */
-       if (!ItemPointerIsValid(ctid))
-               newtup.t_data->t_ctid = newtup.t_self;
-       else
-               newtup.t_data->t_ctid = *ctid;
-       *ctid = newtup.t_self;
-
        LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
        if (dst_buf != old_buf)
                LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);
index 1e7a1b5440f87b6dfa53eba3988283806015d4c2..a825c2816185e5fa5828e4cbae11f9c99b2373c8 100644 (file)
@@ -26,7 +26,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.241 2005/01/14 17:53:33 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.241.4.1 2005/08/25 19:45:00 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1114,8 +1114,10 @@ lnext:   ;
                                foreach(l, estate->es_rowMark)
                                {
                                        execRowMark *erm = lfirst(l);
-                                       Buffer          buffer;
                                        HeapTupleData tuple;
+                                       Buffer          buffer;
+                                       ItemPointerData update_ctid;
+                                       TransactionId update_xmax;
                                        TupleTableSlot *newSlot;
                                        int                     test;
 
@@ -1133,6 +1135,7 @@ lnext:    ;
 
                                        tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
                                        test = heap_mark4update(erm->relation, &tuple, &buffer,
+                                                                                       &update_ctid, &update_xmax,
                                                                                        estate->es_snapshot->curcid);
                                        ReleaseBuffer(buffer);
                                        switch (test)
@@ -1149,11 +1152,15 @@ lnext:  ;
                                                                ereport(ERROR,
                                                                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                                                 errmsg("could not serialize access due to concurrent update")));
-                                                       if (!(ItemPointerEquals(&(tuple.t_self),
-                                                                 (ItemPointer) DatumGetPointer(datum))))
+                                                       if (!ItemPointerEquals(&update_ctid,
+                                                                                                  &tuple.t_self))
                                                        {
-                                                               newSlot = EvalPlanQual(estate, erm->rti, &(tuple.t_self));
-                                                               if (!(TupIsNull(newSlot)))
+                                                               /* updated, so look at updated version */
+                                                               newSlot = EvalPlanQual(estate,
+                                                                                                          erm->rti,
+                                                                                                          &update_ctid,
+                                                                                                          update_xmax);
+                                                               if (!TupIsNull(newSlot))
                                                                {
                                                                        slot = newSlot;
                                                                        estate->es_useEvalPlan = true;
@@ -1405,8 +1412,9 @@ ExecDelete(TupleTableSlot *slot,
 {
        ResultRelInfo *resultRelInfo;
        Relation        resultRelationDesc;
-       ItemPointerData ctid;
        int                     result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
 
        /*
         * get information on the (current) result relation
@@ -1437,7 +1445,7 @@ ExecDelete(TupleTableSlot *slot,
         */
 ldelete:;
        result = heap_delete(resultRelationDesc, tupleid,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 estate->es_snapshot->curcid,
                                                 estate->es_crosscheck_snapshot,
                                                 true /* wait for commit */ );
@@ -1455,14 +1463,17 @@ ldelete:;
                                ereport(ERROR,
                                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                 errmsg("could not serialize access due to concurrent update")));
-                       else if (!(ItemPointerEquals(tupleid, &ctid)))
+                       else if (!ItemPointerEquals(tupleid, &update_ctid))
                        {
-                               TupleTableSlot *epqslot = EvalPlanQual(estate,
-                                                          resultRelInfo->ri_RangeTableIndex, &ctid);
+                               TupleTableSlot *epqslot;
 
+                               epqslot = EvalPlanQual(estate,
+                                                                          resultRelInfo->ri_RangeTableIndex,
+                                                                          &update_ctid,
+                                                                          update_xmax);
                                if (!TupIsNull(epqslot))
                                {
-                                       *tupleid = ctid;
+                                       *tupleid = update_ctid;
                                        goto ldelete;
                                }
                        }
@@ -1509,8 +1520,9 @@ ExecUpdate(TupleTableSlot *slot,
        HeapTuple       tuple;
        ResultRelInfo *resultRelInfo;
        Relation        resultRelationDesc;
-       ItemPointerData ctid;
        int                     result;
+       ItemPointerData update_ctid;
+       TransactionId update_xmax;
        int                     numIndices;
 
        /*
@@ -1578,7 +1590,7 @@ lreplace:;
         * referential integrity updates in serializable transactions.
         */
        result = heap_update(resultRelationDesc, tupleid, tuple,
-                                                &ctid,
+                                                &update_ctid, &update_xmax,
                                                 estate->es_snapshot->curcid,
                                                 estate->es_crosscheck_snapshot,
                                                 true /* wait for commit */ );
@@ -1596,14 +1608,17 @@ lreplace:;
                                ereport(ERROR,
                                                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                                                 errmsg("could not serialize access due to concurrent update")));
-                       else if (!(ItemPointerEquals(tupleid, &ctid)))
+                       else if (!(ItemPointerEquals(tupleid, &update_ctid)))
                        {
-                               TupleTableSlot *epqslot = EvalPlanQual(estate,
-                                                          resultRelInfo->ri_RangeTableIndex, &ctid);
+                               TupleTableSlot *epqslot;
 
+                               epqslot = EvalPlanQual(estate,
+                                                                          resultRelInfo->ri_RangeTableIndex,
+                                                                          &update_ctid,
+                                                                          update_xmax);
                                if (!TupIsNull(epqslot))
                                {
-                                       *tupleid = ctid;
+                                       *tupleid = update_ctid;
                                        tuple = ExecRemoveJunk(estate->es_junkFilter, epqslot);
                                        slot = ExecStoreTuple(tuple,
                                                                        estate->es_junkFilter->jf_resultSlot,
@@ -1750,9 +1765,21 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
  * under READ COMMITTED rules.
  *
  * See backend/executor/README for some info about how this works.
+ *
+ *     estate - executor state data
+ *     rti - rangetable index of table containing tuple
+ *     *tid - t_ctid from the outdated tuple (ie, next updated version)
+ *     priorXmax - t_xmax from the outdated tuple
+ *
+ * *tid is also an output parameter: it's modified to hold the TID of the
+ * latest version of the tuple (note this may be changed even on failure)
+ *
+ * Returns a slot containing the new candidate update/delete tuple, or
+ * NULL if we determine we shouldn't process the row.
  */
 TupleTableSlot *
-EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
+EvalPlanQual(EState *estate, Index rti,
+                        ItemPointer tid, TransactionId priorXmax)
 {
        evalPlanQual *epq;
        EState     *epqstate;
@@ -1796,11 +1823,24 @@ EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
        {
                Buffer          buffer;
 
-               if (heap_fetch(relation, SnapshotDirty, &tuple, &buffer, false, NULL))
+               if (heap_fetch(relation, SnapshotDirty, &tuple, &buffer, true, NULL))
                {
-                       TransactionId xwait = SnapshotDirty->xmax;
+                       /*
+                        * If xmin isn't what we're expecting, the slot must have been
+                        * recycled and reused for an unrelated tuple.  This implies
+                        * that the latest version of the row was deleted, so we need
+                        * do nothing.  (Should be safe to examine xmin without getting
+                        * buffer's content lock, since xmin never changes in an existing
+                        * tuple.)
+                        */
+                       if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data),
+                                                                        priorXmax))
+                       {
+                               ReleaseBuffer(buffer);
+                               return NULL;
+                       }
 
-                       /* xmin should not be dirty... */
+                       /* otherwise xmin should not be dirty... */
                        if (TransactionIdIsValid(SnapshotDirty->xmin))
                                elog(ERROR, "t_xmin is uncommitted in tuple to be updated");
 
@@ -1808,11 +1848,11 @@ EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
                         * If tuple is being updated by other transaction then we have
                         * to wait for its commit/abort.
                         */
-                       if (TransactionIdIsValid(xwait))
+                       if (TransactionIdIsValid(SnapshotDirty->xmax))
                        {
                                ReleaseBuffer(buffer);
-                               XactLockTableWait(xwait);
-                               continue;
+                               XactLockTableWait(SnapshotDirty->xmax);
+                               continue;               /* loop back to repeat heap_fetch */
                        }
 
                        /*
@@ -1824,22 +1864,50 @@ EvalPlanQual(EState *estate, Index rti, ItemPointer tid)
                }
 
                /*
-                * Oops! Invalid tuple. Have to check is it updated or deleted.
-                * Note that it's possible to get invalid SnapshotDirty->tid if
-                * tuple updated by this transaction. Have we to check this ?
+                * If the referenced slot was actually empty, the latest version
+                * of the row must have been deleted, so we need do nothing.
                 */
-               if (ItemPointerIsValid(&(SnapshotDirty->tid)) &&
-                       !(ItemPointerEquals(&(tuple.t_self), &(SnapshotDirty->tid))))
+               if (tuple.t_data == NULL)
                {
-                       /* updated, so look at the updated copy */
-                       tuple.t_self = SnapshotDirty->tid;
-                       continue;
+                       ReleaseBuffer(buffer);
+                       return NULL;
                }
 
                /*
-                * Deleted or updated by this transaction; forget it.
+                * As above, if xmin isn't what we're expecting, do nothing.
                 */
-               return NULL;
+               if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data),
+                                                                priorXmax))
+               {
+                       ReleaseBuffer(buffer);
+                       return NULL;
+               }
+
+               /*
+                * If we get here, the tuple was found but failed SnapshotDirty.
+                * Assuming the xmin is either a committed xact or our own xact
+                * (as it certainly should be if we're trying to modify the tuple),
+                * this must mean that the row was updated or deleted by either
+                * a committed xact or our own xact.  If it was deleted, we can
+                * ignore it; if it was updated then chain up to the next version
+                * and repeat the whole test.
+                *
+                * As above, it should be safe to examine xmax and t_ctid without
+                * the buffer content lock, because they can't be changing.
+                */
+               if (ItemPointerEquals(&tuple.t_self, &tuple.t_data->t_ctid))
+               {
+                       /* deleted, so forget about it */
+                       ReleaseBuffer(buffer);
+                       return NULL;
+               }
+
+               /* updated, so look at the updated row */
+               tuple.t_self = tuple.t_data->t_ctid;
+               /* updated row should have xmin matching this xmax */
+               priorXmax = HeapTupleHeaderGetXmax(tuple.t_data);
+               ReleaseBuffer(buffer);
+               /* loop back to fetch next in chain */
        }
 
        /*
index ef158d97bc3523c468a8e3cf8b4756b88fdfee29..eda347e27503b113ab6f6c73923d30106c6ac65c 100644 (file)
@@ -32,7 +32,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.81.4.1 2005/05/07 21:22:36 tgl Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.81.4.2 2005/08/25 19:45:01 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -1137,10 +1137,13 @@ HeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin,
                                                        HeapTupleHeaderGetXmax(tuple)))
        {
                /*
-                * inserter also deleted it, so it was never visible to anyone
-                * else
+                * Inserter also deleted it, so it was never visible to anyone
+                * else.  However, we can only remove it early if it's not an
+                * updated tuple; else its parent tuple is linking to it via t_ctid,
+                * and this tuple mustn't go away before the parent does.
                 */
-               return HEAPTUPLE_DEAD;
+               if (!(tuple->t_infomask & HEAP_UPDATED))
+                       return HEAPTUPLE_DEAD;
        }
 
        if (!TransactionIdPrecedes(HeapTupleHeaderGetXmax(tuple), OldestXmin))
index 2b5495ff287c49c4e753da52c287a1d8c95f15a5..a45daa74ae73056089ef8859ea5a6017a9fec670 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.93 2004/12/31 22:03:21 pgsql Exp $
+ * $PostgreSQL: pgsql/src/include/access/heapam.h,v 1.93.4.1 2005/08/25 19:45:04 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -154,17 +154,21 @@ extern bool heap_release_fetch(Relation relation, Snapshot snapshot,
                                   HeapTuple tuple, Buffer *userbuf, bool keep_buf,
                                   PgStat_Info *pgstat_info);
 
-extern ItemPointer heap_get_latest_tid(Relation relation, Snapshot snapshot,
+extern void heap_get_latest_tid(Relation relation, Snapshot snapshot,
                                        ItemPointer tid);
 extern void setLastTid(const ItemPointer tid);
 
 extern Oid     heap_insert(Relation relation, HeapTuple tup, CommandId cid);
-extern int heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid,
-                       CommandId cid, Snapshot crosscheck, bool wait);
-extern int heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
-               ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait);
-extern int heap_mark4update(Relation relation, HeapTuple tup,
-                                Buffer *userbuf, CommandId cid);
+extern int heap_delete(Relation relation, ItemPointer tid,
+                                          ItemPointer ctid, TransactionId *update_xmax,
+                                          CommandId cid, Snapshot crosscheck, bool wait);
+extern int heap_update(Relation relation, ItemPointer otid,
+                                          HeapTuple newtup,
+                                          ItemPointer ctid, TransactionId *update_xmax,
+                                          CommandId cid, Snapshot crosscheck, bool wait);
+extern int heap_mark4update(Relation relation, HeapTuple tuple,
+                                                       Buffer *buffer, ItemPointer ctid,
+                                                       TransactionId *update_xmax, CommandId cid);
 
 extern Oid     simple_heap_insert(Relation relation, HeapTuple tup);
 extern void simple_heap_delete(Relation relation, ItemPointer tid);
index f07bdd89d182a192ce0fb179f5364caffd256757..6c7eff0d46dabf1217ac93c549e67f18b0235a0f 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/executor/executor.h,v 1.115 2004/12/31 22:03:29 pgsql Exp $
+ * $PostgreSQL: pgsql/src/include/executor/executor.h,v 1.115.4.1 2005/08/25 19:45:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -109,7 +109,7 @@ extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
 extern void ExecConstraints(ResultRelInfo *resultRelInfo,
                                TupleTableSlot *slot, EState *estate);
 extern TupleTableSlot *EvalPlanQual(EState *estate, Index rti,
-                        ItemPointer tid);
+                                                                       ItemPointer tid, TransactionId priorXmax);
 
 /*
  * prototypes from functions in execProcnode.c