]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
I finally understood what sinvaladt.c is doing --- and it
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 6 Sep 1999 19:37:38 +0000 (19:37 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 6 Sep 1999 19:37:38 +0000 (19:37 +0000)
offended my aesthestic sensibility that there was so much unreadable code
doing so little.  Rewritten code is about half the size, faster, and
(I hope) much more intelligible.

src/backend/storage/ipc/sinval.c
src/backend/storage/ipc/sinvaladt.c
src/include/storage/lock.h
src/include/storage/sinvaladt.h

index e993cef74aa8c05945ab9cf320ef13154110e188..c1a557033b6b8d6177d73ce0bc15f14db8b13c87 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.17 1999/09/04 18:36:45 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinval.c,v 1.18 1999/09/06 19:37:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "storage/sinval.h"
 #include "storage/sinvaladt.h"
 
-extern SISeg *shmInvalBuffer;  /* the shared buffer segment, set by
-                                                                * SISegmentAttach()
-                                                                */
-extern BackendId MyBackendId;
-extern BackendTag MyBackendTag;
-
 SPINLOCK       SInvalLock = (SPINLOCK) NULL;
 
 /****************************************************************************/
@@ -39,11 +33,6 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends)
 {
        int                     status;
 
-       /*
-        * REMOVED SISyncKill(IPCKeyGetSIBufferMemorySemaphoreKey(key));
-        * SISyncInit(IPCKeyGetSIBufferMemorySemaphoreKey(key));
-        */
-
        /* SInvalLock gets set in spin.c, during spinlock init */
        status = SISegmentInit(true, IPCKeyGetSIBufferMemoryBlock(key),
                                                   maxBackends);
@@ -53,9 +42,9 @@ CreateSharedInvalidationState(IPCKey key, int maxBackends)
 }
 
 /****************************************************************************/
-/*     AttachSharedInvalidationState(key)       Attach a buffer segment                        */
+/*     AttachSharedInvalidationState(key)       Attach to existing buffer segment      */
 /*                                                                                                                                                     */
-/*     should be called only by the POSTMASTER                                                                 */
+/*     should be called by each backend during startup                                                 */
 /****************************************************************************/
 void
 AttachSharedInvalidationState(IPCKey key)
@@ -74,6 +63,11 @@ AttachSharedInvalidationState(IPCKey key)
                elog(FATAL, "AttachSharedInvalidationState: failed segment init");
 }
 
+/*
+ * InitSharedInvalidationState
+ *             Initialize new backend's state info in buffer segment.
+ *             Must be called after AttachSharedInvalidationState().
+ */
 void
 InitSharedInvalidationState(void)
 {
@@ -88,24 +82,19 @@ InitSharedInvalidationState(void)
 
 /*
  * RegisterSharedInvalid
- *     Returns a new local cache invalidation state containing a new entry.
+ *     Add a shared-cache-invalidation message to the global SI message queue.
  *
  * Note:
  *     Assumes hash index is valid.
  *     Assumes item pointer is valid.
  */
-/****************************************************************************/
-/*     RegisterSharedInvalid(cacheId, hashIndex, pointer)                                              */
-/*                                                                                                                                                     */
-/*     register a message in the buffer                                                                                */
-/*     should be called by a backend                                                                                   */
-/****************************************************************************/
 void
 RegisterSharedInvalid(int cacheId,             /* XXX */
                                          Index hashIndex,
                                          ItemPointer pointer)
 {
-       SharedInvalidData newInvalid;
+       SharedInvalidData       newInvalid;
+       bool                            insertOK;
 
        /*
         * This code has been hacked to accept two types of messages.  This
@@ -127,34 +116,16 @@ RegisterSharedInvalid(int cacheId,                /* XXX */
                ItemPointerSetInvalid(&newInvalid.pointerData);
 
        SpinAcquire(SInvalLock);
-       while (!SISetDataEntry(shmInvalBuffer, &newInvalid))
-       {
-               /* buffer full */
-               /* release a message, mark process cache states to be invalid */
-               SISetProcStateInvalid(shmInvalBuffer);
-
-               if (!SIDelDataEntries(shmInvalBuffer, 1))
-               {
-                       /* inconsistent buffer state -- shd never happen */
-                       SpinRelease(SInvalLock);
-                       elog(FATAL, "RegisterSharedInvalid: inconsistent buffer state");
-               }
-
-               /* loop around to try write again */
-       }
+       insertOK = SIInsertDataEntry(shmInvalBuffer, &newInvalid);
        SpinRelease(SInvalLock);
+       if (! insertOK)
+               elog(NOTICE, "RegisterSharedInvalid: SI buffer overflow");
 }
 
 /*
  * InvalidateSharedInvalid
- *     Processes all entries in a shared cache invalidation state.
+ *             Process shared-cache-invalidation messages waiting for this backend
  */
-/****************************************************************************/
-/*     InvalidateSharedInvalid(invalFunction, resetFunction)                                   */
-/*                                                                                                                                                     */
-/*     invalidate a message in the buffer       (read and clean up)                            */
-/*     should be called by a backend                                                                                   */
-/****************************************************************************/
 void
 InvalidateSharedInvalid(void (*invalFunction) (),
                                                void (*resetFunction) ())
index 2e64d027f31c7c7eb7b78967b7991c400286046d..99426693cd1b8512e8ad1bc1691e5a13ee65241e 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.24 1999/09/04 18:36:45 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.25 1999/09/06 19:37:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include "miscadmin.h"
 #include "storage/backendid.h"
 #include "storage/lmgr.h"
+#include "storage/sinvaladt.h"
 #include "utils/trace.h"
 
-/* ----------------
- *             global variable notes
- *
- *             SharedInvalidationSemaphore
- *
- *             shmInvalBuffer
- *                             the shared buffer segment, set by SISegmentAttach()
- *
- *             MyBackendId
- *                             might be removed later, used only for
- *                             debugging in debug routines (end of file)
- *
- *             SIDbId
- *                             identification of buffer (disappears)
- *
- *             SIRelId                 \
- *             SIDummyOid               \      identification of buffer
- *             SIXidData                /
- *             SIXid                   /
- *
- *     XXX This file really needs to be cleaned up.  We switched to using
- *             spinlocks to protect critical sections (as opposed to using fake
- *             relations and going through the lock manager) and some of the old
- *             cruft was 'ifdef'ed out, while other parts (now unused) are still
- *             compiled into the system. -mer 5/24/92
- * ----------------
- */
-#ifdef HAS_TEST_AND_SET
-int                    SharedInvalidationLockId;
-
-#else
-IpcSemaphoreId SharedInvalidationSemaphore;
-
-#endif
-
 SISeg     *shmInvalBuffer;
-extern BackendId MyBackendId;
 
-static void CleanupInvalidationState(int status, SISeg *segInOutP);
-static BackendId SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag);
-static int     SIGetNumEntries(SISeg *segP);
+static void SISegmentAttach(IpcMemoryId shmid);
+static void SISegInit(SISeg *segP, int maxBackends);
+static void CleanupInvalidationState(int status, SISeg *segP);
+static void SISetProcStateInvalid(SISeg *segP);
 
-/************************************************************************/
-/* SISetActiveProcess(segP, backendId) set the backend status active   */
-/*             should be called only by the postmaster when creating a backend */
-/************************************************************************/
-/* XXX I suspect that the segP parameter is extraneous. -hirohama */
-static void
-SISetActiveProcess(SISeg *segInOutP, BackendId backendId)
-{
-       /* mark all messages as read */
-
-       /* Assert(segP->procState[backendId - 1].tag == MyBackendTag); */
-
-       segInOutP->procState[backendId - 1].resetState = false;
-       segInOutP->procState[backendId - 1].limit = SIGetNumEntries(segInOutP);
-}
-
-/****************************************************************************/
-/* SIBackendInit()     initializes a backend to operate on the buffer                  */
-/****************************************************************************/
+/*
+ * SISegmentInit
+ *             Create a new SI memory segment, or attach to an existing one
+ *
+ * This is called with createNewSegment = true by the postmaster (or by
+ * a standalone backend), and subsequently with createNewSegment = false
+ * by backends started by the postmaster.
+ *
+ * Note: maxBackends param is only valid when createNewSegment is true
+ */
 int
-SIBackendInit(SISeg *segInOutP)
+SISegmentInit(bool createNewSegment, IPCKey key, int maxBackends)
 {
-       LockRelId       LtCreateRelId();
-       TransactionId LMITransactionIdCopy();
-
-       Assert(MyBackendTag > 0);
-
-       MyBackendId = SIAssignBackendId(segInOutP, MyBackendTag);
-       if (MyBackendId == InvalidBackendTag)
-               return 0;
-
-#ifdef INVALIDDEBUG
-       elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
-                MyBackendTag, MyBackendId);
-#endif  /* INVALIDDEBUG */
+       int                     segSize;
+       IpcMemoryId shmId;
 
-       SISetActiveProcess(segInOutP, MyBackendId);
-       on_shmem_exit(CleanupInvalidationState, (caddr_t) segInOutP);
-       return 1;
-}
+       if (createNewSegment)
+       {
+               /* Kill existing segment, if any */
+               IpcMemoryKill(key);
 
-/* ----------------
- *             SIAssignBackendId
- * ----------------
- */
-static BackendId
-SIAssignBackendId(SISeg *segInOutP, BackendTag backendTag)
-{
-       Index           index;
-       ProcState  *stateP = NULL;
+               /* Figure space needed.
+                * Note sizeof(SISeg) includes the first ProcState entry.
+                */
+               segSize = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
 
-       for (index = 0; index < segInOutP->maxBackends; index++)
-       {
-               if (segInOutP->procState[index].tag == InvalidBackendTag ||
-                       segInOutP->procState[index].tag == backendTag)
+               /* Get a shared segment */
+               shmId = IpcMemoryCreate(key, segSize, IPCProtection);
+               if (shmId < 0)
                {
-                       stateP = &segInOutP->procState[index];
-                       break;
+                       perror("SISegmentInit: segment create failed");
+                       return -1;                      /* an error */
                }
 
-               if (!PointerIsValid(stateP) ||
-                       (segInOutP->procState[index].resetState &&
-                        (!stateP->resetState ||
-                         stateP->tag < backendTag)) ||
-                       (!stateP->resetState &&
-                        (segInOutP->procState[index].limit <
-                         stateP->limit ||
-                         stateP->tag < backendTag)))
-                       stateP = &segInOutP->procState[index];
-       }
-
-       /* verify that all "procState" entries checked for matching tags */
+               /* Attach to the shared cache invalidation segment */
+               /* sets the global variable shmInvalBuffer */
+               SISegmentAttach(shmId);
 
-       for (index++; index < segInOutP->maxBackends; index++)
-       {
-               if (segInOutP->procState[index].tag == backendTag)
-                       elog(FATAL, "SIAssignBackendId: tag %d found twice", backendTag);
+               /* Init shared memory contents */
+               SISegInit(shmInvalBuffer, maxBackends);
        }
-
-       Assert(stateP);
-
-       if (stateP->tag != InvalidBackendTag)
+       else
        {
-               if (stateP->tag == backendTag)
-                       elog(NOTICE, "SIAssignBackendId: reusing tag %d", backendTag);
-               else
+               /* find existing segment */
+               shmId = IpcMemoryIdGet(key, 0);
+               if (shmId < 0)
                {
-                       elog(NOTICE, "SIAssignBackendId: discarding tag %d", stateP->tag);
-                       return InvalidBackendTag;
+                       perror("SISegmentInit: segment get failed");
+                       return -1;                      /* an error */
                }
-       }
-
-       stateP->tag = backendTag;
-
-       return 1 + stateP - &segInOutP->procState[0];
-}
-
-
-/************************************************************************/
-/* The following function should be called only by the postmaster !!   */
-/************************************************************************/
 
-/************************************************************************/
-/* SISetDeadProcess(segP, backendId)  set the backend status DEAD              */
-/*             should be called only by the postmaster when a backend died             */
-/************************************************************************/
-static void
-SISetDeadProcess(SISeg *segP, int backendId)
-{
-       /* XXX call me.... */
-
-       segP->procState[backendId - 1].resetState = false;
-       segP->procState[backendId - 1].limit = -1;
-       segP->procState[backendId - 1].tag = InvalidBackendTag;
+               /* Attach to the shared cache invalidation segment */
+               /* sets the global variable shmInvalBuffer */
+               SISegmentAttach(shmId);
+       }
+       return 1;
 }
 
 /*
- * CleanupInvalidationState
- * Note:
- *             This is a temporary hack.  ExitBackend should call this instead
- *             of exit (via on_shmem_exit).
+ * SISegmentAttach
+ *             Attach to specified shared memory segment
  */
 static void
-CleanupInvalidationState(int status,   /* XXX */
-                                                SISeg *segInOutP)              /* XXX style */
-{
-       Assert(PointerIsValid(segInOutP));
-
-       SISetDeadProcess(segInOutP, MyBackendId);
-}
-
-
-/************************************************************************/
-/* SIComputeSize()     - compute size and offsets for SI segment                       */
-/************************************************************************/
-static void
-SIComputeSize(SISegOffsets *oP, int maxBackends)
-{
-       int                     A,
-                               B,
-                               a,
-                               b,
-                               totalSize;
-
-       A = 0;
-       /* sizeof(SISeg) includes the first ProcState entry */
-       a = sizeof(SISeg) + sizeof(ProcState) * (maxBackends - 1);
-       a = MAXALIGN(a);                        /* offset to first data entry */
-       b = sizeof(SISegEntry) * MAXNUMMESSAGES;
-       B = A + a + b;
-       B = MAXALIGN(B);
-       totalSize = B - A;
-
-       oP->startSegment = A;
-       oP->offsetToFirstEntry = a; /* relative to A */
-       oP->offsetToEndOfSegment = totalSize;           /* relative to A */
-}
-
-
-/************************************************************************/
-/* SISetStartEntrySection(segP, offset)                - sets the offset                       */
-/************************************************************************/
-static void
-SISetStartEntrySection(SISeg *segP, Offset offset)
-{
-       segP->startEntrySection = offset;
-}
-
-/************************************************************************/
-/* SIGetStartEntrySection(segP)                - returnss the offset                           */
-/************************************************************************/
-static Offset
-SIGetStartEntrySection(SISeg *segP)
+SISegmentAttach(IpcMemoryId shmid)
 {
-       return segP->startEntrySection;
-}
+       shmInvalBuffer = (SISeg *) IpcMemoryAttach(shmid);
 
-
-/************************************************************************/
-/* SISetEndEntrySection(segP, offset)  - sets the offset                               */
-/************************************************************************/
-static void
-SISetEndEntrySection(SISeg *segP, Offset offset)
-{
-       segP->endEntrySection = offset;
+       if (shmInvalBuffer == IpcMemAttachFailed)
+       {
+               /* XXX use validity function */
+               elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
+       }
 }
 
-/************************************************************************/
-/* SISetEndEntryChain(segP, offset)            - sets the offset                               */
-/************************************************************************/
+/*
+ * SISegInit
+ *             Initialize contents of a new shared memory sinval segment
+ */
 static void
-SISetEndEntryChain(SISeg *segP, Offset offset)
+SISegInit(SISeg *segP, int maxBackends)
 {
-       segP->endEntryChain = offset;
-}
-
-/************************************************************************/
-/* SIGetEndEntryChain(segP)            - returnss the offset                                   */
-/************************************************************************/
-static Offset
-SIGetEndEntryChain(SISeg *segP)
-{
-       return segP->endEntryChain;
-}
+       int                     i;
 
-/************************************************************************/
-/* SISetStartEntryChain(segP, offset)  - sets the offset                               */
-/************************************************************************/
-static void
-SISetStartEntryChain(SISeg *segP, Offset offset)
-{
-       segP->startEntryChain = offset;
-}
+       /* Clear message counters, save size of procState array */
+       segP->minMsgNum = 0;
+       segP->maxMsgNum = 0;
+       segP->maxBackends = maxBackends;
 
-/************************************************************************/
-/* SIGetStartEntryChain(segP)  - returns  the offset                                   */
-/************************************************************************/
-static Offset
-SIGetStartEntryChain(SISeg *segP)
-{
-       return segP->startEntryChain;
-}
+       /* The buffer[] array is initially all unused, so we need not fill it */
 
-/************************************************************************/
-/* SISetNumEntries(segP, num)  sets the current nuber of entries               */
-/************************************************************************/
-static bool
-SISetNumEntries(SISeg *segP, int num)
-{
-       if (num <= MAXNUMMESSAGES)
-       {
-               segP->numEntries = num;
-               return true;
-       }
-       else
+       /* Mark all backends inactive */
+       for (i = 0; i < maxBackends; i++)
        {
-               return false;                   /* table full */
+               segP->procState[i].nextMsgNum = -1;     /* inactive */
+               segP->procState[i].resetState = false;
+               segP->procState[i].tag = InvalidBackendTag;
        }
 }
 
-/************************************************************************/
-/* SIGetNumEntries(segP)       - returns the current nuber of entries          */
-/************************************************************************/
-static int
-SIGetNumEntries(SISeg *segP)
-{
-       return segP->numEntries;
-}
-
-
-/************************************************************************/
-/* SISetMaxNumEntries(segP, num)       sets the maximal number of entries      */
-/************************************************************************/
-static bool
-SISetMaxNumEntries(SISeg *segP, int num)
+/*
+ * SIBackendInit
+ *             Initialize a new backend to operate on the sinval buffer
+ *
+ * NB: this routine, and all following ones, must be executed with the
+ * SInvalLock spinlock held, since there may be multiple backends trying
+ * to access the buffer.
+ */
+int
+SIBackendInit(SISeg *segP)
 {
-       if (num <= MAXNUMMESSAGES)
-       {
-               segP->maxNumEntries = num;
-               return true;
-       }
-       else
-       {
-               return false;                   /* wrong number */
-       }
-}
-
-
-/************************************************************************/
-/* SIGetProcStateLimit(segP, i) returns the limit of read messages             */
-/************************************************************************/
-
-#define SIGetProcStateLimit(segP,i) \
-               ((segP)->procState[i].limit)
+       Index           index;
+       ProcState  *stateP = NULL;
 
-/************************************************************************/
-/* SIIncNumEntries(segP, num)  increments the current nuber of entries */
-/************************************************************************/
-static bool
-SIIncNumEntries(SISeg *segP, int num)
-{
+       Assert(MyBackendTag > 0);
 
-       /*
-        * Try to prevent table overflow. When the table is 70% full send a
-        * SIGUSR2 to the postmaster which will send it back to all the
-        * backends. This will be handled by Async_NotifyHandler() with a
-        * StartTransactionCommand() which will flush unread SI entries for
-        * each backend.                                                                        dz - 27 Jan 1998
-        */
-       if (segP->numEntries == (MAXNUMMESSAGES * 70 / 100))
+       /* Check for duplicate backend tags (should never happen) */
+       for (index = 0; index < segP->maxBackends; index++)
        {
-               TPRINTF(TRACE_VERBOSE,
-                       "SIIncNumEntries: table is 70%% full, signaling postmaster");
-               kill(getppid(), SIGUSR2);
+               if (segP->procState[index].tag == MyBackendTag)
+                       elog(FATAL, "SIBackendInit: tag %d already in use", MyBackendTag);
        }
 
-       if ((segP->numEntries + num) <= MAXNUMMESSAGES)
-       {
-               segP->numEntries = segP->numEntries + num;
-               return true;
-       }
-       else
+       /* Look for a free entry in the procState array */
+       for (index = 0; index < segP->maxBackends; index++)
        {
-               return false;                   /* table full */
+               if (segP->procState[index].tag == InvalidBackendTag)
+               {
+                       stateP = &segP->procState[index];
+                       break;
+               }
        }
-}
 
-/************************************************************************/
-/* SIDecNumEntries(segP, num)  decrements the current nuber of entries */
-/************************************************************************/
-static bool
-SIDecNumEntries(SISeg *segP, int num)
-{
-       if ((segP->numEntries - num) >= 0)
-       {
-               segP->numEntries = segP->numEntries - num;
-               return true;
-       }
-       else
+       /* elog() with spinlock held is probably not too cool, but these
+        * conditions should never happen anyway.
+        */
+       if (stateP == NULL)
        {
-               return false;                   /* not enough entries in table */
+               elog(NOTICE, "SIBackendInit: no free procState slot available");
+               MyBackendId = InvalidBackendTag;
+               return 0;
        }
-}
 
-/************************************************************************/
-/* SISetStartFreeSpace(segP, offset)  - sets the offset                                        */
-/************************************************************************/
-static void
-SISetStartFreeSpace(SISeg *segP, Offset offset)
-{
-       segP->startFreeSpace = offset;
-}
-
-/************************************************************************/
-/* SIGetStartFreeSpace(segP)  - returns the offset                                             */
-/************************************************************************/
-static Offset
-SIGetStartFreeSpace(SISeg *segP)
-{
-       return segP->startFreeSpace;
-}
+       MyBackendId = (stateP - &segP->procState[0]) + 1;
 
+#ifdef INVALIDDEBUG
+       elog(DEBUG, "SIBackendInit: backend tag %d; backend id %d.",
+                MyBackendTag, MyBackendId);
+#endif  /* INVALIDDEBUG */
 
+       /* mark myself active, with all extant messages already read */
+       stateP->tag = MyBackendTag;
+       stateP->resetState = false;
+       stateP->nextMsgNum = segP->maxMsgNum;
 
-/************************************************************************/
-/* SIGetFirstDataEntry(segP)  returns first data entry                                 */
-/************************************************************************/
-static SISegEntry *
-SIGetFirstDataEntry(SISeg *segP)
-{
-       SISegEntry *eP;
-       Offset          startChain;
-
-       startChain = SIGetStartEntryChain(segP);
-
-       if (startChain == InvalidOffset)
-               return NULL;
-
-       eP = (SISegEntry *) ((Pointer) segP +
-                                                SIGetStartEntrySection(segP) +
-                                                startChain);
-       return eP;
-}
-
-
-/************************************************************************/
-/* SIGetLastDataEntry(segP)  returns last data entry in the chain              */
-/************************************************************************/
-static SISegEntry *
-SIGetLastDataEntry(SISeg *segP)
-{
-       SISegEntry *eP;
-       Offset          endChain;
-
-       endChain = SIGetEndEntryChain(segP);
-
-       if (endChain == InvalidOffset)
-               return NULL;
-
-       eP = (SISegEntry *) ((Pointer) segP +
-                                                SIGetStartEntrySection(segP) +
-                                                endChain);
-       return eP;
-}
-
-/************************************************************************/
-/* SIGetNextDataEntry(segP, offset)  returns next data entry                   */
-/************************************************************************/
-#define SIGetNextDataEntry(segP,offset) \
-       (((offset) == InvalidOffset) ? (SISegEntry *) NULL : \
-        (SISegEntry *) ((Pointer) (segP) + \
-                                        (segP)->startEntrySection + \
-                                        (Offset) (offset)))
-
-/************************************************************************/
-/* SIGetNthDataEntry(segP, n)  returns the n-th data entry in chain    */
-/************************************************************************/
-static SISegEntry *
-SIGetNthDataEntry(SISeg *segP,
-                                 int n)                /* must range from 1 to MaxMessages */
-{
-       SISegEntry *eP;
-       int                     i;
-
-       if (n <= 0)
-               return NULL;
-
-       eP = SIGetFirstDataEntry(segP);
-       for (i = 1; i < n; i++)
-       {
-               /* skip one and get the next    */
-               eP = SIGetNextDataEntry(segP, eP->next);
-       }
-
-       return eP;
-}
-
-/************************************************************************/
-/* SIEntryOffset(segP, entryP)  returns the offset for an pointer              */
-/************************************************************************/
-static Offset
-SIEntryOffset(SISeg *segP, SISegEntry *entryP)
-{
-       /* relative to B !! */
-       return ((Offset) ((Pointer) entryP -
-                                         (Pointer) segP -
-                                         SIGetStartEntrySection(segP)));
-}
-
+       /* register exit routine to mark my entry inactive at exit */
+       on_shmem_exit(CleanupInvalidationState, (caddr_t) segP);
 
-/************************************************************************/
-/* SISetDataEntry(segP, data)  - sets a message in the segemnt                 */
-/************************************************************************/
-bool
-SISetDataEntry(SISeg *segP, SharedInvalidData *data)
-{
-       Offset          offsetToNewData;
-       SISegEntry *eP,
-                          *lastP;
-
-       if (!SIIncNumEntries(segP, 1))
-               return false;                   /* no space */
-
-       /* get a free entry */
-       offsetToNewData = SIGetStartFreeSpace(segP);
-       eP = SIGetNextDataEntry(segP, offsetToNewData);         /* it's a free one */
-       SISetStartFreeSpace(segP, eP->next);
-       /* fill it up */
-       eP->entryData = *data;
-       eP->isfree = false;
-       eP->next = InvalidOffset;
-
-       /* handle insertion point at the end of the chain !! */
-       lastP = SIGetLastDataEntry(segP);
-       if (lastP == NULL)
-       {
-               /* there is no chain, insert the first entry */
-               SISetStartEntryChain(segP, SIEntryOffset(segP, eP));
-       }
-       else
-       {
-               /* there is a last entry in the chain */
-               lastP->next = SIEntryOffset(segP, eP);
-       }
-       SISetEndEntryChain(segP, SIEntryOffset(segP, eP));
-       return true;
+       return 1;
 }
 
-
-/************************************************************************/
-/* SIDecProcLimit(segP, num)  decrements all process limits                            */
-/************************************************************************/
+/*
+ * CleanupInvalidationState
+ *             Mark the current backend as no longer active.
+ *
+ * This function is called via on_shmem_exit() during backend shutdown.
+ */
 static void
-SIDecProcLimit(SISeg *segP, int num)
+CleanupInvalidationState(int status,
+                                                SISeg *segP)
 {
-       int                     i;
+       Assert(PointerIsValid(segP));
 
-       for (i = 0; i < segP->maxBackends; i++)
-       {
-               /* decrement only, if there is a limit > 0      */
-               if (segP->procState[i].limit > 0)
-               {
-                       segP->procState[i].limit = segP->procState[i].limit - num;
-                       if (segP->procState[i].limit < 0)
-                       {
-                               /* limit was not high enough, reset to zero */
-                               /* negative means it's a dead backend       */
-                               segP->procState[i].limit = 0;
-                       }
-               }
-       }
-}
+       /* XXX we probably oughta grab the SInval spinlock for this...
+        * but I think it is safe not to.
+        */
 
+       segP->procState[MyBackendId - 1].nextMsgNum = -1;
+       segP->procState[MyBackendId - 1].resetState = false;
+       segP->procState[MyBackendId - 1].tag = InvalidBackendTag;
+}
 
-/************************************************************************/
-/* SIDelDataEntries(segP, n)           - free the FIRST n entries                      */
-/************************************************************************/
+/*
+ * SIInsertDataEntry
+ *             Add a new invalidation message to the buffer.
+ *
+ * If we are unable to insert the message because the buffer is full,
+ * then clear the buffer and assert the "reset" flag to each backend.
+ * This will cause all the backends to discard *all* invalidatable state.
+ *
+ * Returns true for normal successful insertion, false if had to reset.
+ */
 bool
-SIDelDataEntries(SISeg *segP, int n)
+SIInsertDataEntry(SISeg *segP, SharedInvalidData *data)
 {
-       int                     i;
-
-       if (n <= 0)
-               return false;
+       int             numMsgs = segP->maxMsgNum - segP->minMsgNum;
 
-       if (!SIDecNumEntries(segP, n))
+       /* Is the buffer full? */
+       if (numMsgs >= MAXNUMMESSAGES)
        {
-               /* not that many entries in buffer */
+               /* Yes, so force reset */
+               SISetProcStateInvalid(segP);
                return false;
        }
 
-       for (i = 1; i <= n; i++)
+       /*
+        * Try to prevent table overflow.  When the table is 70% full send a
+        * SIGUSR2 (ordinarily a NOTIFY signal) to the postmaster, which will
+        * send it back to all the backends.  This will force idle backends to
+        * execute a transaction to look through pg_listener for NOTIFY messages,
+        * and as a byproduct of the transaction start they will read SI entries.
+        *
+        * This should never happen if all the backends are actively executing
+        * queries, but if a backend is sitting idle then it won't be starting
+        * transactions and so won't be reading SI entries.
+        *
+        * dz - 27 Jan 1998
+        */
+       if (numMsgs == (MAXNUMMESSAGES * 70 / 100) &&
+               IsUnderPostmaster)
        {
-               SISegEntry *e1P = SIGetFirstDataEntry(segP);
-               SISetStartEntryChain(segP, e1P->next);
-               if (SIGetStartEntryChain(segP) == InvalidOffset)
-               {
-                       /* it was the last entry */
-                       SISetEndEntryChain(segP, InvalidOffset);
-               }
-               /* free the entry */
-               e1P->isfree = true;
-               e1P->next = SIGetStartFreeSpace(segP);
-               SISetStartFreeSpace(segP, SIEntryOffset(segP, e1P));
+               TPRINTF(TRACE_VERBOSE,
+                               "SIInsertDataEntry: table is 70%% full, signaling postmaster");
+               kill(getppid(), SIGUSR2);
        }
 
-       SIDecProcLimit(segP, n);
+       /*
+        * Insert new message into proper slot of circular buffer
+        */
+       segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data;
+       segP->maxMsgNum++;
+
        return true;
 }
 
-
-
-/************************************************************************/
-/* SISetProcStateInvalid(segP) checks and marks a backends state as    */
-/*                                                                     invalid                                                         */
-/************************************************************************/
-void
+/*
+ * SISetProcStateInvalid
+ *             Flush pending messages from buffer, assert reset flag for each backend
+ *
+ * This is used only to recover from SI buffer overflow.
+ */
+static void
 SISetProcStateInvalid(SISeg *segP)
 {
        int                     i;
 
+       segP->minMsgNum = 0;
+       segP->maxMsgNum = 0;
+
        for (i = 0; i < segP->maxBackends; i++)
        {
-               if (segP->procState[i].limit == 0)
+               if (segP->procState[i].nextMsgNum >= 0) /* active backend? */
                {
-                       /* backend i didn't read any message                             */
                        segP->procState[i].resetState = true;
-
-                       /*
-                        * XXX signal backend that it has to reset its internal cache
-                        * ?
-                        */
+                       segP->procState[i].nextMsgNum = 0;
                }
        }
 }
 
-/************************************************************************/
-/* SIGetDataEntry(segP, backendId, data)                                                               */
-/*             get next SI message for specified backend, if there is one              */
-/*                                                                                                                                             */
-/*             Possible return values:                                                                                 */
-/*                     0: no SI message available                                                                      */
-/*                     1: next SI message has been extracted into *data                        */
-/*                             (there may be more messages available after this one!)  */
-/*                -1: SI reset message extracted                                                               */
-/************************************************************************/
+/*
+ * SIGetDataEntry
+ *             get next SI message for specified backend, if there is one
+ *
+ * Possible return values:
+ *     0: no SI message available
+ *     1: next SI message has been extracted into *data
+ *             (there may be more messages available after this one!)
+ * -1: SI reset message extracted
+ */
 int
 SIGetDataEntry(SISeg *segP, int backendId,
                           SharedInvalidData *data)
 {
-       SISegEntry *msg;
+       ProcState  *stateP = & segP->procState[backendId - 1];
 
-       Assert(segP->procState[backendId - 1].tag == MyBackendTag);
+       Assert(stateP->tag == MyBackendTag);
 
-       if (segP->procState[backendId - 1].resetState)
+       if (stateP->resetState)
        {
-               /* new valid state--mark all messages "read" */
-               segP->procState[backendId - 1].resetState = false;
-               segP->procState[backendId - 1].limit = SIGetNumEntries(segP);
+               /* Force reset.  We can say we have dealt with any messages added
+                * since the reset, as well...
+                */
+               stateP->resetState = false;
+               stateP->nextMsgNum = segP->maxMsgNum;
                return -1;
        }
 
-       /* Get next message for this backend, if any */
-
-       /* This is fairly inefficient if there are many messages,
-        * but normally there should not be...
-        */
-       msg = SIGetNthDataEntry(segP,
-                                                       SIGetProcStateLimit(segP, backendId - 1) + 1);
-
-       if (msg == NULL)
+       if (stateP->nextMsgNum >= segP->maxMsgNum)
                return 0;                               /* nothing to read */
 
-       *data = msg->entryData;         /* return contents of message */
-
-       segP->procState[backendId - 1].limit++;         /* one more message read */
+       /*
+        * Retrieve message and advance my counter.
+        */
+       *data = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
+       stateP->nextMsgNum++;
 
        /* There may be other backends that haven't read the message,
         * so we cannot delete it here.
@@ -666,9 +331,10 @@ SIGetDataEntry(SISeg *segP, int backendId,
        return 1;                                       /* got a message */
 }
 
-/************************************************************************/
-/* SIDelExpiredDataEntries     (segP)  - removes irrelevant messages           */
-/************************************************************************/
+/*
+ * SIDelExpiredDataEntries
+ *             Remove messages that have been consumed by all active backends
+ */
 void
 SIDelExpiredDataEntries(SISeg *segP)
 {
@@ -676,161 +342,34 @@ SIDelExpiredDataEntries(SISeg *segP)
                                i,
                                h;
 
-       min = 9999999;
+       min = segP->maxMsgNum;
+       if (min == segP->minMsgNum)
+               return;                                 /* fast path if no messages exist */
+
+       /* Recompute minMsgNum = minimum of all backends' nextMsgNum */
+
        for (i = 0; i < segP->maxBackends; i++)
        {
-               h = SIGetProcStateLimit(segP, i);
+               h = segP->procState[i].nextMsgNum;
                if (h >= 0)
                {                                               /* backend active */
                        if (h < min)
                                min = h;
                }
        }
-       if (min < 9999999 && min > 0)
-       {
-               /* we can remove min messages */
-               /* this adjusts also the state limits! */
-               if (!SIDelDataEntries(segP, min))
-                       elog(FATAL, "SIDelExpiredDataEntries: Invalid segment state");
-       }
-}
-
-
-
-/************************************************************************/
-/* SISegInit(segP)     - initializes the segment                                                       */
-/************************************************************************/
-static void
-SISegInit(SISeg *segP, SISegOffsets *oP, int maxBackends)
-{
-       int                     i;
-       SISegEntry *eP;
-
-       /* set semaphore ids in the segment */
-       /* XXX */
-       SISetStartEntrySection(segP, oP->offsetToFirstEntry);
-       SISetEndEntrySection(segP, oP->offsetToEndOfSegment);
-       SISetStartFreeSpace(segP, 0);
-       SISetStartEntryChain(segP, InvalidOffset);
-       SISetEndEntryChain(segP, InvalidOffset);
-       SISetNumEntries(segP, 0);
-       SISetMaxNumEntries(segP, MAXNUMMESSAGES);
-       segP->maxBackends = maxBackends;
-       for (i = 0; i < segP->maxBackends; i++)
-       {
-               segP->procState[i].limit = -1;  /* no backend active  !! */
-               segP->procState[i].resetState = false;
-               segP->procState[i].tag = InvalidBackendTag;
-       }
-       /* construct a chain of free entries                                                    */
-       for (i = 1; i < MAXNUMMESSAGES; i++)
-       {
-               eP = (SISegEntry *) ((Pointer) segP +
-                                                        SIGetStartEntrySection(segP) +
-                                                        (i - 1) * sizeof(SISegEntry));
-               eP->isfree = true;
-               eP->next = i * sizeof(SISegEntry);              /* relative to B */
-       }
-       /* handle the last free entry separate                                                  */
-       eP = (SISegEntry *) ((Pointer) segP +
-                                                SIGetStartEntrySection(segP) +
-                                                (MAXNUMMESSAGES - 1) * sizeof(SISegEntry));
-       eP->isfree = true;
-       eP->next = InvalidOffset;       /* it's the end of the chain !! */
-}
-
-
-
-/************************************************************************/
-/* SISegmentKill(key)  - kill any segment                                                              */
-/************************************************************************/
-static void
-SISegmentKill(int key)                 /* the corresponding key for the segment */
-{
-       IpcMemoryKill(key);
-}
-
-
-/************************************************************************/
-/* SISegmentGet(key, size)     - get a shared segment of size <size>           */
-/*                               returns a segment id                                                                  */
-/************************************************************************/
-static IpcMemoryId
-SISegmentGet(int key,                  /* the corresponding key for the segment */
-                        int size,                      /* size of segment in bytes                              */
-                        bool create)
-{
-       IpcMemoryId shmid;
-
-       if (create)
-               shmid = IpcMemoryCreate(key, size, IPCProtection);
-       else
-               shmid = IpcMemoryIdGet(key, size);
-       return shmid;
-}
-
-/************************************************************************/
-/* SISegmentAttach(shmid)      - attach a shared segment with id shmid         */
-/************************************************************************/
-static void
-SISegmentAttach(IpcMemoryId shmid)
-{
-       shmInvalBuffer = (struct SISeg *) IpcMemoryAttach(shmid);
-       if (shmInvalBuffer == IpcMemAttachFailed)
-       {
-               /* XXX use validity function */
-               elog(FATAL, "SISegmentAttach: Could not attach segment: %m");
-       }
-}
-
-
-/************************************************************************/
-/* SISegmentInit()                     initialize SI segment                                           */
-/*                                                                                                                                             */
-/* NB: maxBackends param is only valid when killExistingSegment is true        */
-/************************************************************************/
-int
-SISegmentInit(bool killExistingSegment, IPCKey key, int maxBackends)
-{
-       SISegOffsets offsets;
-       IpcMemoryId shmId;
-       bool            create;
-
-       if (killExistingSegment)
-       {
-               /* Kill existing segment */
-               /* set semaphore */
-               SISegmentKill(key);
-
-               /* Get a shared segment */
-               SIComputeSize(&offsets, maxBackends);
-               create = true;
-               shmId = SISegmentGet(key, offsets.offsetToEndOfSegment, create);
-               if (shmId < 0)
-               {
-                       perror("SISegmentGet: failed");
-                       return -1;                      /* an error */
-               }
+       segP->minMsgNum = min;
 
-               /* Attach the shared cache invalidation  segment */
-               /* sets the global variable shmInvalBuffer */
-               SISegmentAttach(shmId);
-
-               /* Init shared memory table */
-               SISegInit(shmInvalBuffer, &offsets, maxBackends);
-       }
-       else
+       /* When minMsgNum gets really large, decrement all message counters
+        * so as to forestall overflow of the counters.
+        */
+       if (min >= MSGNUMWRAPAROUND)
        {
-               /* use an existing segment */
-               create = false;
-               shmId = SISegmentGet(key, 0, create);
-               if (shmId < 0)
+               segP->minMsgNum -= MSGNUMWRAPAROUND;
+               segP->maxMsgNum -= MSGNUMWRAPAROUND;
+               for (i = 0; i < segP->maxBackends; i++)
                {
-                       perror("SISegmentGet: getting an existent segment failed");
-                       return -1;                      /* an error */
+                       if (segP->procState[i].nextMsgNum >= 0)
+                               segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
                }
-               /* Attach the shared cache invalidation segment */
-               SISegmentAttach(shmId);
        }
-       return 1;
 }
index 87b8538212fcd3f1f42d2aca1b8d57ccb6d4ad1e..8f0f834e0f4ff11147a606f7a5b40d1abd22e593 100644 (file)
@@ -6,16 +6,16 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.33 1999/07/16 17:07:38 momjian Exp $
+ * $Id: lock.h,v 1.34 1999/09/06 19:37:37 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef LOCK_H_
 #define LOCK_H_
 
+#include "storage/ipc.h"
 #include "storage/itemptr.h"
 #include "storage/shmem.h"
-#include "storage/sinvaladt.h"
 #include "utils/array.h"
 
 extern SPINLOCK LockMgrLock;
index e008e52d30fc57ea7c1240c9c4a89c369d418a05..b9d349a4c5755181ea80b3e1bef5cfa1a61434e3 100644 (file)
@@ -6,7 +6,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: sinvaladt.h,v 1.17 1999/09/04 18:36:44 tgl Exp $
+ * $Id: sinvaladt.h,v 1.18 1999/09/06 19:37:37 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "storage/itemptr.h"
 
 /*
- * The structure of the shared cache invaidation segment
+ * The shared cache invalidation manager is responsible for transmitting
+ * invalidation messages between backends.  Any message sent by any backend
+ * must be delivered to all already-running backends before it can be
+ * forgotten.
  *
+ * Conceptually, the messages are stored in an infinite array, where
+ * maxMsgNum is the next array subscript to store a submitted message in,
+ * minMsgNum is the smallest array subscript containing a message not yet
+ * read by all backends, and we always have maxMsgNum >= minMsgNum.  (They
+ * are equal when there are no messages pending.)  For each active backend,
+ * there is a nextMsgNum pointer indicating the next message it needs to read;
+ * we have maxMsgNum >= nextMsgNum >= minMsgNum for every backend.
+ *
+ * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
+ * entries.  We translate MsgNum values into circular-buffer indexes by
+ * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
+ * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
+ * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
+ * in the buffer.  If the buffer does overflow, we reset it to empty and
+ * force each backend to "reset", ie, discard all its invalidatable state.
+ *
+ * We would have problems if the MsgNum values overflow an integer, so
+ * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
+ * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
+ * large so that we don't need to do this often.  It must be a multiple of
+ * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
+ * to be moved when we do it.
  */
-/*
-A------------- Header info --------------
-       criticalSectionSemaphoreId
-       generalSemaphoreId
-       startEntrySection       (offset a)
-       endEntrySection         (offset a + b)
-       startFreeSpace          (offset relative to B)
-       startEntryChain         (offset relatiev to B)
-       endEntryChain           (offset relative to B)
-       numEntries
-       maxNumEntries
-       maxBackends
-       procState[maxBackends] --> limit
-                                                               resetState (bool)
-a                                                              tag (POSTID)
-B------------- Start entry section -------
-       SISegEntry      --> entryData --> ... (see      SharedInvalidData!)
-                                       isfree  (bool)
-                                       next  (offset to next entry in chain )
-b        .... (dynamically growing down)
-C----------------End shared segment -------
 
-*/
 
-/* Parameters (configurable)  *******************************************/
-#define MAXNUMMESSAGES 4000            /* maximum number of messages in seg */
+/*
+ * Configurable parameters.
+ *
+ * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
+ * Must be a power of 2 for speed.
+ *
+ * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
+ * Must be a multiple of MAXNUMMESSAGES.  Should be large.
+ */
 
+#define MAXNUMMESSAGES 4096
+#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 4096)
 
-#define InvalidOffset  1000000000              /* a invalid offset  (End of
-                                                                                * chain) */
+/* The content of one shared-invalidation message */
+typedef struct SharedInvalidData
+{
+       int                     cacheId;                /* XXX */
+       Index           hashIndex;
+       ItemPointerData pointerData;
+} SharedInvalidData;
+
+typedef SharedInvalidData *SharedInvalid;
 
+/* Per-backend state in shared invalidation structure */
 typedef struct ProcState
 {
-       int                     limit;                  /* the number of read messages                  */
+       /* nextMsgNum is -1 in an inactive ProcState array entry. */
+       int                     nextMsgNum;             /* next message number to read, or -1 */
        bool            resetState;             /* true, if backend has to reset its state */
-       int                     tag;                    /* special tag, recieved from the
-                                                                * postmaster */
+       int                     tag;                    /* backend tag received from postmaster */
 } ProcState;
 
-
+/* Shared cache invalidation memory segment */
 typedef struct SISeg
 {
-       IpcSemaphoreId criticalSectionSemaphoreId;      /* semaphore id         */
-       IpcSemaphoreId generalSemaphoreId;      /* semaphore id         */
-       Offset          startEntrySection;              /* (offset a)                                   */
-       Offset          endEntrySection;/* (offset a + b)                               */
-       Offset          startFreeSpace; /* (offset relative to B)               */
-       Offset          startEntryChain;/* (offset relative to B)               */
-       Offset          endEntryChain;  /* (offset relative to B)               */
-       int                     numEntries;
-       int                     maxNumEntries;
+       /*
+        * General state information
+        */
+       int                     minMsgNum;              /* oldest message still needed */
+       int                     maxMsgNum;              /* next message number to be assigned */
        int                     maxBackends;    /* size of procState array */
        /*
+        * Circular buffer holding shared-inval messages
+        */
+       SharedInvalidData       buffer[MAXNUMMESSAGES];
+       /*
+        * Per-backend state info.
+        *
         * We declare procState as 1 entry because C wants a fixed-size array,
         * but actually it is maxBackends entries long.
         */
        ProcState       procState[1];   /* reflects the invalidation state */
-       /*
-        * The entry section begins after the end of the procState array.
-        * Everything there is controlled by offsets.
-        */
 } SISeg;
 
-typedef struct SharedInvalidData
-{
-       int                     cacheId;                /* XXX */
-       Index           hashIndex;
-       ItemPointerData pointerData;
-} SharedInvalidData;
-
-typedef SharedInvalidData *SharedInvalid;
-
-
-typedef struct SISegEntry
-{
-       SharedInvalidData entryData;/* the message data */
-       bool            isfree;                 /* entry free? */
-       Offset          next;                   /* offset to next entry */
-} SISegEntry;
-
-typedef struct SISegOffsets
-{
-       Offset          startSegment;   /* always 0 (for now) */
-       Offset          offsetToFirstEntry;             /* A + a = B */
-       Offset          offsetToEndOfSegment;   /* A + a + b */
-} SISegOffsets;
-
-
-/****************************************************************************/
-/* synchronization of the shared buffer access                                                         */
-/*       access to the buffer is synchronized by the lock manager !!                   */
-/****************************************************************************/
 
-#define SI_LockStartValue  255
-#define SI_SharedLock    (-1)
-#define SI_ExclusiveLock  (-255)
+extern SISeg *shmInvalBuffer;  /* pointer to the shared buffer segment,
+                                                                * set by SISegmentAttach()
+                                                                */
 
-extern SISeg *shmInvalBuffer;
 
 /*
  * prototypes for functions in sinvaladt.c
  */
-extern int     SIBackendInit(SISeg *segInOutP);
-extern int     SISegmentInit(bool killExistingSegment, IPCKey key,
+extern int     SISegmentInit(bool createNewSegment, IPCKey key,
                                                  int maxBackends);
+extern int     SIBackendInit(SISeg *segP);
 
-extern bool SISetDataEntry(SISeg *segP, SharedInvalidData *data);
-extern void SISetProcStateInvalid(SISeg *segP);
+extern bool SIInsertDataEntry(SISeg *segP, SharedInvalidData *data);
 extern int     SIGetDataEntry(SISeg *segP, int backendId,
                                                   SharedInvalidData *data);
-extern bool SIDelDataEntries(SISeg *segP, int n);
 extern void SIDelExpiredDataEntries(SISeg *segP);
 
 #endif  /* SINVALADT_H */