]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Fix LMGR for MVCC.
authorVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 7 May 1999 01:23:11 +0000 (01:23 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Fri, 7 May 1999 01:23:11 +0000 (01:23 +0000)
Get rid of Extend lock mode.

src/backend/access/heap/hio.c
src/backend/storage/lmgr/lmgr.c
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lmgr.h
src/include/storage/lock.h
src/include/storage/proc.h

index 3b4fa4af0bb08daccde6296e0bdd12b1d0b6d058..f991b206d4426f147e4bac3242c6f0aa61dfa2b9 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Id: hio.c,v 1.18 1999/05/01 15:04:46 vadim Exp $
+ *       $Id: hio.c,v 1.19 1999/05/07 01:22:53 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -111,14 +111,13 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
        Item            item;
 
        /*
-        * Actually, we lock _relation_ here, not page, but I believe
-        * that locking page is faster... Obviously, we could get rid
-        * of ExtendLock mode at all and use ExclusiveLock mode on
-        * page 0, as long as we use page-level locking for indices only,
-        * but we are in 6.5-beta currently...  - vadim 05/01/99
+        * Lock relation for extention. We can use LockPage here as long as 
+        * in all other places we use page-level locking for indices only.
+        * Alternatevely, we could define pseudo-table as we do for
+        * transactions with XactLockTable.
         */
        if (!relation->rd_myxactonly)
-               LockPage(relation, 0, ExtendLock);
+               LockPage(relation, 0, ExclusiveLock);
 
        /*
         * XXX This does an lseek - VERY expensive - but at the moment it is
@@ -166,7 +165,7 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
        }
 
        if (!relation->rd_myxactonly)
-               UnlockPage(relation, 0, ExtendLock);
+               UnlockPage(relation, 0, ExclusiveLock);
 
        offnum = PageAddItem((Page) pageHeader, (Item) tuple->t_data,
                                                 tuple->t_len, InvalidOffsetNumber, LP_USED);
index 7d34499b01704b7e1c9858a489782b0a136b97fc..7eb51207c7e5a69ca4e1e6214b6dbbb4d1bc77ce 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.22 1999/02/13 23:18:24 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.23 1999/05/07 01:23:02 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -79,9 +79,6 @@ static MASK LockConflicts[] = {
        (1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock) | 
        (1 << AccessShareLock),
 
-/* ExtendLock */
-       (1 << ExtendLock)
-
 };
 
 static int     LockPrios[] = {
@@ -92,8 +89,7 @@ static int    LockPrios[] = {
        4,
        5,
        6,
-       7,
-       1
+       7
 };
 
 LOCKMETHOD     LockTableId = (LOCKMETHOD) NULL;
index c39138ff129fe88371a5bf8d2ef79ac0de3fbf45..c1996fe0713a9f0cfec276ee269f5ed95792fcd4 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.49 1999/04/30 17:03:04 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.50 1999/05/07 01:23:03 vadim Exp $
  *
  * NOTES
  *       Outside modules can create a lock table and acquire/release
@@ -50,8 +50,7 @@
 #include "utils/trace.h"
 #include "utils/ps_status.h"
 
-static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
-                  TransactionId xid);
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
 
 /*
  * lockDebugRelation can be used to trace unconditionally a single relation,
@@ -143,12 +142,14 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
 #endif  /* !LOCK_MGR_DEBUG */
 
 static char *lock_types[] = {
-       "",
-       "WRITE",
-       "READ",
-       "WRITE INTENT",
-       "READ INTENT",
-       "EXTEND"
+       "INVALID",
+       "AccessShareLock",
+       "RowShareLock",
+       "RowExclusiveLock",
+       "ShareLock",
+       "ShareRowExclusiveLock",
+       "ExclusiveLock",
+       "AccessExclusiveLock"
 };
 
 SPINLOCK       LockMgrLock;            /* in Shmem or created in
@@ -631,12 +632,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 
        /* --------------------
         * If I'm the only one holding a lock, then there
-        * cannot be a conflict.  Need to subtract one from the
-        * lock's count since we just bumped the count up by 1
-        * above.
+        * cannot be a conflict. The same is true if we already
+        * hold this lock.
         * --------------------
         */
-       if (result->nHolding == lock->nActive)
+       if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
        {
                result->holders[lockmode]++;
                result->nHolding++;
@@ -647,7 +647,39 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
                return TRUE;
        }
 
-       status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+       /*
+        * If lock requested conflicts with locks requested by waiters...
+        */
+       if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
+       {
+               int             i = 1;
+
+               /*
+                * If I don't hold locks or my locks don't conflict 
+                * with waiters then force to sleep.
+                */
+               if (result->nHolding > 0)
+               {
+                       for ( ; i <= lockMethodTable->ctl->numLockModes; i++)
+                       {
+                               if (result->holders[i] > 0 && 
+                                       lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+                                       break;  /* conflict */
+                       }
+               }
+       
+               if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
+               {
+                       XID_PRINT("LockAcquire: higher priority proc waiting",
+                                         result);
+                       status = STATUS_FOUND;
+               }
+               else
+                       status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+       }
+       else
+               status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+
        if (status == STATUS_OK)
                GrantLock(lock, lockmode);
        else if (status == STATUS_FOUND)
@@ -680,7 +712,25 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
                        return FALSE;
                }
 #endif
-               status = WaitOnLock(lockmethod, lock, lockmode, xid);
+               /*
+                * Construct bitmask of locks we hold before going to sleep.
+                */
+               MyProc->holdLock = 0;
+               if (result->nHolding > 0)
+               {
+                       int             i,
+                                       tmpMask = 2;
+
+                       for (i = 1; i <= lockMethodTable->ctl->numLockModes; 
+                                       i++, tmpMask <<= 1)
+                       {
+                               if (result->holders[i] > 0)
+                                       MyProc->holdLock |= tmpMask;
+                       }
+                       Assert(MyProc->holdLock != 0);
+               }
+
+               status = WaitOnLock(lockmethod, lock, lockmode);
 
                /*
                 * Check the xid entry status, in case something in the ipc
@@ -712,10 +762,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
  * determining whether or not any new lock acquired conflicts with
  * the old ones.
  *
- *     For example, if I am already holding a WRITE_INTENT lock,
- *     there will not be a conflict with my own READ_LOCK.  If I
- *     don't consider the intent lock when checking for conflicts,
- *     I find no conflict.
  * ----------------------------
  */
 int
@@ -812,32 +858,6 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
        }
        Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
 
-       /*
-        * We can control runtime this option. Default is lockReadPriority=0
-        */
-       if (!lockReadPriority)
-       {
-               /* ------------------------
-                * If someone with a greater priority is waiting for the lock,
-                * do not continue and share the lock, even if we can.
-                * Don't do this if the process already has some locks, because
-                * this could hold up other people waiting on our locks, causing
-                * a priority inversion.  bjm
-                * ------------------------
-                */
-               int                     myprio = LockMethodTable[lockmethod]->ctl->prio[lockmode];
-               PROC_QUEUE *waitQueue = &(lock->waitProcs);
-               PROC       *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
-
-               if (SHMQueueEmpty(&MyProc->lockQueue) && waitQueue->size &&
-                       topproc->prio > myprio)
-               {
-                       XID_PRINT("LockResolveConflicts: higher priority proc waiting",
-                                         result);
-                       return STATUS_FOUND;
-               }
-       }
-
        /* ----------------------------
         * first check for global conflicts: If no locks conflict
         * with mine, then I get the lock.
@@ -909,12 +929,10 @@ GrantLock(LOCK *lock, LOCKMODE lockmode)
 }
 
 static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
-                  TransactionId xid)
+WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
 {
        PROC_QUEUE *waitQueue = &(lock->waitProcs);
        LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
-       int                     prio = lockMethodTable->ctl->prio[lockmode];
        char            old_status[64],
                                new_status[64];
 
@@ -934,11 +952,9 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
        strcat(new_status, " waiting");
        PS_SET_STATUS(new_status);
        if (ProcSleep(waitQueue,
-                                 lockMethodTable->ctl->masterLock,
+                                 lockMethodTable->ctl,
                                  lockmode,
-                                 prio,
-                                 lock,
-                                 xid) != NO_ERROR)
+                                 lock) != NO_ERROR)
        {
                /* -------------------
                 * This could have happend as a result of a deadlock,
@@ -952,12 +968,16 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
                LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
                Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
                Assert(lock->nActive <= lock->nHolding);
+               if (lock->activeHolders[lockmode] == lock->holders[lockmode])
+                       lock->waitMask &= BITS_OFF[lockmode];
                SpinRelease(lockMethodTable->ctl->masterLock);
                elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
 
                /* not reached */
        }
 
+       if (lock->activeHolders[lockmode] == lock->holders[lockmode])
+               lock->waitMask &= BITS_OFF[lockmode];
        PS_SET_STATUS(old_status);
        LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
        return STATUS_OK;
@@ -1129,6 +1149,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        lock->nActive--;
        lock->activeHolders[lockmode]--;
 
+#ifdef NOT_USED
        /* --------------------------
         * If there are still active locks of the type I just released, no one
         * should be woken up.  Whoever is asleep will still conflict
@@ -1138,6 +1159,19 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
        if (lock->activeHolders[lockmode])
                wakeupNeeded = false;
        else
+#endif
+       /*
+        * Above is not valid any more (due to MVCC lock modes). 
+        * Actually we should compare activeHolders[lockmode] with 
+        * number of waiters holding lock of this type and try to
+        * wakeup only if these numbers are equal (and lock released 
+        * conflicts with locks requested by waiters). For the moment
+        * we only check the last condition.
+        */
+       if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
+               wakeupNeeded = true;
+
+       if (!(lock->activeHolders[lockmode]))
        {
                /* change the conflict mask.  No more of this lock type. */
                lock->mask &= BITS_OFF[lockmode];
@@ -1199,12 +1233,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
 
        if (wakeupNeeded)
        {
-               /* --------------------------
-                * Wake the first waiting process and grant him the lock if it
-                * doesn't conflict.  The woken process must record the lock
-                * himself.
-                * --------------------------
-                */
                ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
        }
        else
@@ -1275,6 +1303,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
 
        for (;;)
        {
+               bool    wakeupNeeded = false;
 
                /*
                 * Sometimes the queue appears to be messed up.
@@ -1380,6 +1409,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                                           &&(lock->activeHolders[i] >= 0));
                                if (!lock->activeHolders[i])
                                        lock->mask &= BITS_OFF[i];
+                               /*
+                                * Read comments in LockRelease
+                                */
+                               if (!wakeupNeeded && xidLook->holders[i] > 0 && 
+                                       lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+                                       wakeupNeeded = true;
                        }
                        lock->nHolding -= xidLook->nHolding;
                        lock->nActive -= xidLook->nHolding;
@@ -1444,14 +1479,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
                                return FALSE;
                        }
                }
-               else
+               else if (wakeupNeeded)
                {
-                       /* --------------------
-                        * Wake the first waiting process and grant him the lock if it
-                        * doesn't conflict.  The woken process must record the lock
-                        * him/herself.
-                        * --------------------
-                        */
                        waitQueue = &(lock->waitProcs);
                        ProcLockWakeup(waitQueue, lockmethod, lock);
                }
@@ -1534,46 +1563,22 @@ LockingDisabled()
 bool
 DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 {
-       int                     done;
-       XIDLookupEnt *xidLook = NULL;
-       XIDLookupEnt *tmp = NULL;
-       SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
-       LOCK       *lock;
+       int                                             done;
+       XIDLookupEnt               *xidLook = NULL;
+       XIDLookupEnt               *tmp = NULL;
+       SHMEM_OFFSET                    end = MAKE_OFFSET(lockQueue);
+       LOCK                               *lock;
 
-       LOCKMETHODTABLE *lockMethodTable;
-       XIDLookupEnt *result,
-                               item;
-       HTAB       *xidTable;
-       bool            found;
-
-       static PROC *checked_procs[MAXBACKENDS];
-       static int      nprocs;
-       static bool MyNHolding;
+       static PROC                        *checked_procs[MAXBACKENDS];
+       static int                              nprocs;
 
        /* initialize at start of recursion */
        if (skip_check)
        {
                checked_procs[0] = MyProc;
                nprocs = 1;
-
-               lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
-               xidTable = lockMethodTable->xidHash;
-
-               MemSet(&item, 0, XID_TAGSIZE);
-               TransactionIdStore(MyProc->xid, &item.tag.xid);
-               item.tag.lock = MAKE_OFFSET(findlock);
-#ifdef NOT_USED
-               item.tag.pid = pid;
-#endif
-
-               if (!(result = (XIDLookupEnt *)
-                         hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
-               {
-                       elog(NOTICE, "LockAcquire: xid table corrupted");
-                       return true;
-               }
-               MyNHolding = result->nHolding;
        }
+
        if (SHMQueueEmpty(lockQueue))
                return false;
 
@@ -1583,12 +1588,6 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
 
        for (;;)
        {
-               /* ---------------------------
-                * XXX Here we assume the shared memory queue is circular and
-                * that we know its internal structure.  Should have some sort of
-                * macros to allow one to walk it.      mer 20 July 1991
-                * ---------------------------
-                */
                done = (xidLook->queue.next == end);
                lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
 
@@ -1613,45 +1612,21 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
                        proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
                        for (i = 0; i < waitQueue->size; i++)
                        {
+                               /*
+                                * If I hold some locks on findlock and another proc 
+                                * waits on it holding locks too - check if we are
+                                * waiting one another.
+                                */
                                if (proc != MyProc &&
-                                       lock == findlock && /* skip_check also true */
-                                       MyNHolding) /* I already hold some lock on it */
+                                       lock == findlock &&     /* skip_check also true */
+                                       MyProc->holdLock)
                                {
-
-                                       /*
-                                        * For findlock's wait queue, we are interested in
-                                        * procs who are blocked waiting for a write-lock on
-                                        * the table we are waiting on, and already hold a
-                                        * lock on it. We first check to see if there is an
-                                        * escalation deadlock, where we hold a readlock and
-                                        * want a writelock, and someone else holds readlock
-                                        * on the same table, and wants a writelock.
-                                        *
-                                        * Basically, the test is, "Do we both hold some lock on
-                                        * findlock, and we are both waiting in the lock
-                                        * queue?"   bjm
-                                        */
+                                       LOCKMETHODCTL  *lockctl = 
+                                                       LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
 
                                        Assert(skip_check);
-                                       Assert(MyProc->prio >= 2);
-
-                                       lockMethodTable = LockMethodTable[1];
-                                       xidTable = lockMethodTable->xidHash;
-
-                                       MemSet(&item, 0, XID_TAGSIZE);
-                                       TransactionIdStore(proc->xid, &item.tag.xid);
-                                       item.tag.lock = MAKE_OFFSET(findlock);
-#ifdef NOT_USED
-                                       item.tag.pid = pid;
-#endif
-
-                                       if (!(result = (XIDLookupEnt *)
-                                                 hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
-                                       {
-                                               elog(NOTICE, "LockAcquire: xid table corrupted");
-                                               return true;
-                                       }
-                                       if (result->nHolding)
+                                       if (lockctl->conflictTab[MyProc->token] & proc->holdLock && 
+                                               lockctl->conflictTab[proc->token] & MyProc->holdLock)
                                                return true;
                                }
 
index 229a78587cbd643389b2c42aac4f00fcbe29ef1f..b80d32e1b44e727c3b4cbb9568936c884781f283 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *             This is so that we can support more backends. (system-wide semaphore
  *             sets run out pretty fast.)                                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
  */
 #include <sys/time.h>
 #include <unistd.h>
@@ -106,6 +106,8 @@ static void ProcKill(int exitStatus, int pid);
 static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
 static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
 
+static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause.";
+
 /*
  * InitProcGlobal -
  *       initializes the global process table. We put it here so that
@@ -488,68 +490,80 @@ ProcQueueInit(PROC_QUEUE *queue)
  */
 int
 ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
-                 SPINLOCK spinlock,
+                 LOCKMETHODCTL *lockctl,
                  int token,                    /* lockmode */
-                 int prio,
-                 LOCK *lock,
-                 TransactionId xid)    /* needed by user locks, see below */
+                 LOCK *lock)
 {
        int                     i;
+       SPINLOCK        spinlock = lockctl->masterLock;
        PROC       *proc;
+       int                     myMask = (1 << token);
+       int                     waitMask = lock->waitMask;
+       int                     aheadHolders[MAX_LOCKMODES];
+       bool            selfConflict = (lockctl->conflictTab[token] & myMask),
+                               prevSame = false;
        bool            deadlock_checked = false;
        struct itimerval timeval,
                                dummy;
 
-       /*
-        * If the first entries in the waitQueue have a greater priority than
-        * we have, we must be a reader, and they must be a writers, and we
-        * must be here because the current holder is a writer or a reader but
-        * we don't share shared locks if a writer is waiting. We put
-        * ourselves after the writers.  This way, we have a FIFO, but keep
-        * the readers together to give them decent priority, and no one
-        * starves.  Because we group all readers together, a non-empty queue
-        * only has a few possible configurations:
-        *
-        * [readers] [writers] [readers][writers] [writers][readers]
-        * [writers][readers][writers]
-        *
-        * In a full queue, we would have a reader holding a lock, then a writer
-        * gets the lock, then a bunch of readers, made up of readers who
-        * could not share the first readlock because a writer was waiting,
-        * and new readers arriving while the writer had the lock.  bjm
-        */
+       MyProc->token = token;
+       MyProc->waitLock = lock;
+
        proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
-       /* If we are a reader, and they are writers, skip past them */
-       for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
-               proc = (PROC *) MAKE_PTR(proc->links.prev);
+       /* if we don't conflict with any waiter - be first in queue */
+       if (!(lockctl->conflictTab[token] & waitMask))
+               goto ins;
 
-       /* The rest of the queue is FIFO, with readers first, writers last */
-       for (; i < waitQueue->size && proc->prio <= prio; i++)
-               proc = (PROC *) MAKE_PTR(proc->links.prev);
+       for (i = 1; i < MAX_LOCKMODES; i++)
+               aheadHolders[i] = lock->activeHolders[i];
+       (aheadHolders[token])++;
 
-       MyProc->prio = prio;
-       MyProc->token = token;
-       MyProc->waitLock = lock;
+       for (i = 0; i < waitQueue->size; i++)
+       {
+               /* am I waiting for him ? */
+               if (lockctl->conflictTab[token] & proc->holdLock)
+               {
+                       /* is he waiting for me ? */
+                       if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+                       {
+                               MyProc->errType = STATUS_ERROR;
+                               elog(NOTICE, DeadLockMessage);
+                               goto rt;
+                       }
+                       /* being waiting for him - go past */
+               }
+               /* if he waits for me */
+               else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+               {
+                       break;
+               }
+               /* if conflicting locks requested */
+               else if (lockctl->conflictTab[proc->token] & myMask)
+               {
+                       /*
+                        * If I request non self-conflicting lock and there
+                        * are others requesting the same lock just before me -
+                        * stay here.
+                        */
+                       if (!selfConflict && prevSame)
+                               break;
+               }
+               /*
+                * Last attempt to don't move any more: if we don't conflict
+                * with rest waiters in queue.
+                */
+               else if (!(lockctl->conflictTab[token] & waitMask))
+                       break;
 
-#ifdef USER_LOCKS
-       /* -------------------
-        * Currently, we only need this for the ProcWakeup routines.
-        * This must be 0 for user lock, so we can't just use the value
-        * from GetCurrentTransactionId().
-        * -------------------
-        */
-       TransactionIdStore(xid, &MyProc->xid);
-#else
-#ifndef LowLevelLocking
-       /* -------------------
-        * currently, we only need this for the ProcWakeup routines
-        * -------------------
-        */
-       TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
-#endif
-#endif
+               prevSame = (proc->token == token);
+               (aheadHolders[proc->token])++;
+               if (aheadHolders[proc->token] == lock->holders[proc->token])
+                       waitMask &= ~ (1 << proc->token);
+               proc = (PROC *) MAKE_PTR(proc->links.prev);
+       }
 
+ins:;
        /* -------------------
         * assume that these two operations are atomic (because
         * of the spinlock).
@@ -558,6 +572,7 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
        SHMQueueInsertTL(&(proc->links), &(MyProc->links));
        waitQueue->size++;
 
+       lock->waitMask |= myMask;
        SpinRelease(spinlock);
 
        /* --------------
@@ -608,6 +623,8 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
         */
        SpinAcquire(spinlock);
 
+rt:;
+
 #ifdef LOCK_MGR_DEBUG
        /* Just to get meaningful debug messages from DumpLocks() */
        MyProc->waitLock = (LOCK *) NULL;
@@ -655,9 +672,9 @@ int
 ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
 {
        PROC       *proc;
-       int                     count;
+       int                     count = 0;
        int                     trace_flag;
-       int                     last_locktype = -1;
+       int                     last_locktype = 0;
        int                     queue_size = queue->size;
 
        Assert(queue->size >= 0);
@@ -666,7 +683,6 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
                return STATUS_NOT_FOUND;
 
        proc = (PROC *) MAKE_PTR(queue->links.prev);
-       count = 0;
        while ((queue_size--) && (proc))
        {
 
@@ -678,7 +694,7 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
                        continue;
 
                /*
-                * This proc conflicts with locks held by others, ignored.
+                * Does this proc conflict with locks held by others ?
                 */
                if (LockResolveConflicts(lockmethod,
                                                                 lock,
@@ -686,6 +702,8 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
                                                                 proc->xid,
                                                                 (XIDLookupEnt *) NULL) != STATUS_OK)
                {
+                       if (count != 0)
+                               break;
                        last_locktype = proc->token;
                        continue;
                }
@@ -828,7 +846,7 @@ HandleDeadLock(int sig)
         */
        UnlockLockTable();
 
-       elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
+       elog(NOTICE, DeadLockMessage);
        return;
 }
 
index 0e78c33de9b51f0bae8fdea790075ccfc046de63..3af0071d468f51817a343a723967800786d73b25 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lmgr.h,v 1.18 1999/02/19 06:06:34 tgl Exp $
+ * $Id: lmgr.h,v 1.19 1999/05/07 01:23:05 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -25,8 +25,6 @@
 #define ExclusiveLock                  6
 #define AccessExclusiveLock            7
 
-#define ExtendLock                             8
-
 extern LOCKMETHOD LockTableId;
 
 
index da77f1d523ccbe86be493fb7f04c02dad10a0a43..387f164247cacbad8b8457c646d7390f2e76c38c 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.24 1999/03/06 21:17:43 tgl Exp $
+ * $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,7 +41,7 @@ typedef int LOCKMODE;
 typedef int LOCKMETHOD;
 
 /* MAX_LOCKMODES cannot be larger than the bits in MASK */
-#define MAX_LOCKMODES  9
+#define MAX_LOCKMODES  8
 
 /*
  * MAX_LOCK_METHODS corresponds to the number of spin locks allocated in
@@ -204,6 +204,7 @@ typedef struct LOCK
 
        /* data */
        int                     mask;
+       int                     waitMask;
        PROC_QUEUE      waitProcs;
        int                     holders[MAX_LOCKMODES];
        int                     nHolding;
index 952f50553ca47a6662604ad0eff2c20259e9fc35..53b677858fd59c4d88bef2fdcc7c742addf42219 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.20 1999/02/19 07:10:47 tgl Exp $
+ * $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -48,8 +48,9 @@ typedef struct proc
                                                                 * were starting our xact: vacuum must not
                                                                 * remove tuples deleted by xid >= xmin ! */
 
-       LOCK       *waitLock;           /* Lock we're sleeping on */
-       int                     token;                  /* info for proc wakeup routines */
+       LOCK       *waitLock;           /* Lock we're sleeping on ... */
+       int                     token;                  /* type of lock we sleeping for */
+       int                     holdLock;               /* while holding these locks */
        int                     pid;                    /* This procs process id */
        short           sLocks[MAX_SPINS];              /* Spin lock stats */
        SHM_QUEUE       lockQueue;              /* locks associated with current
@@ -116,8 +117,8 @@ extern bool ProcRemove(int pid);
 /* make static in storage/lmgr/proc.c -- jolly */
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
-                 int prio, LOCK *lock, TransactionId xid);
+extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token, 
+                                       LOCK *lock);
 extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
                           LOCK *lock);
 extern void ProcAddLock(SHM_QUEUE *elem);