]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
WAL
authorVadim B. Mikheev <vadim4o@yahoo.com>
Sat, 28 Oct 2000 16:21:00 +0000 (16:21 +0000)
committerVadim B. Mikheev <vadim4o@yahoo.com>
Sat, 28 Oct 2000 16:21:00 +0000 (16:21 +0000)
25 files changed:
src/backend/access/transam/transsup.c
src/backend/access/transam/varsup.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogutils.c
src/backend/commands/dbcommands.c
src/backend/commands/vacuum.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/backend/storage/buffer/xlog_bufmgr.c [new file with mode: 0644]
src/backend/storage/buffer/xlog_localbuf.c [new file with mode: 0644]
src/backend/storage/file/fd.c
src/backend/storage/smgr/md.c
src/backend/storage/smgr/smgr.c
src/backend/utils/cache/relcache.c
src/backend/utils/init/postinit.c
src/include/access/transam.h
src/include/access/xact.h
src/include/access/xlog.h
src/include/access/xlogdefs.h [new file with mode: 0644]
src/include/access/xlogutils.h
src/include/storage/buf_internals.h
src/include/storage/bufmgr.h
src/include/storage/bufpage.h
src/include/storage/smgr.h

index d219f8b6841f75bd4d320b901e3a3fcdb9e29914..74e8c39eae085205d46661c9c0331cd9919b73ed 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/transam/Attic/transsup.c,v 1.25 2000/01/26 05:56:04 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/transam/Attic/transsup.c,v 1.26 2000/10/28 16:20:53 vadim Exp $
  *
  * NOTES
  *       This file contains support functions for the high
@@ -186,6 +186,10 @@ TransBlockGetXidStatus(Block tblock,
        bits8           bit2;
        BitIndex        offset;
 
+#ifdef XLOG
+       tblock = (Block) ((char*) tblock + sizeof(XLogRecPtr));
+#endif
+
        /* ----------------
         *      calculate the index into the transaction data where
         *      our transaction status is located
@@ -227,6 +231,10 @@ TransBlockSetXidStatus(Block tblock,
        Index           index;
        BitIndex        offset;
 
+#ifdef XLOG
+       tblock = (Block) ((char*) tblock + sizeof(XLogRecPtr));
+#endif
+
        /* ----------------
         *      calculate the index into the transaction data where
         *      we sould store our transaction status.
index 029da1d72ca8c3316941f94abed17d2f089ac4bc..49c82b55700ac6c4106aaefabf8764bef27a5263 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.29 2000/07/25 20:18:19 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.30 2000/10/28 16:20:53 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -125,7 +125,11 @@ VariableRelationPutNextXid(TransactionId xid)
 
        TransactionIdStore(xid, &(var->nextXidData));
 
+#ifdef XLOG
+       WriteBuffer(buf);       /* temp */
+#else
        FlushBuffer(buf, TRUE);
+#endif
 }
 
 /* --------------------------------
index a0476d97cffeb0236b38ec1821b95e38123c3a2b..6040b262b90c4a1d191e5676008ac1fbea7bc35a 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.77 2000/10/24 20:06:39 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.78 2000/10/28 16:20:53 vadim Exp $
  *
  * NOTES
  *             Transaction aborts can now occur two ways:
 
 extern bool SharedBufferChanged;
 
+void RecordTransactionCommit(void);
+
 static void AbortTransaction(void);
 static void AtAbort_Cache(void);
 static void AtAbort_Locks(void);
@@ -191,7 +193,6 @@ static void AtStart_Memory(void);
 static void CleanupTransaction(void);
 static void CommitTransaction(void);
 static void RecordTransactionAbort(void);
-static void RecordTransactionCommit(void);
 static void StartTransaction(void);
 
 /* ----------------
@@ -220,7 +221,7 @@ int                 XactIsoLevel;
 #ifdef XLOG
 #include "access/xlogutils.h"
 
-int                    CommitDelay = 100;
+int                    CommitDelay = 5;        /* 1/200 sec */
 
 void           xact_redo(XLogRecPtr lsn, XLogRecord *record);
 void           xact_undo(XLogRecPtr lsn, XLogRecord *record);
@@ -658,8 +659,8 @@ AtStart_Memory(void)
  *                       -cim 3/18/90
  * --------------------------------
  */
-static void
-RecordTransactionCommit(void)
+void
+RecordTransactionCommit()
 {
        TransactionId xid;
        int                     leak;
@@ -683,6 +684,8 @@ RecordTransactionCommit(void)
                struct timeval  delay;
                XLogRecPtr              recptr;
 
+               BufmgrCommit();
+
                xlrec.xtime = time(NULL);
                /*
                 * MUST SAVE ARRAY OF RELFILENODE-s TO DROP
index 1343743c0969561b21f189b028585cdfcb33294c..aa952a42ab47eb63757198d4ba5e38ee6405e5da 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.21 2000/10/24 09:56:09 vadim Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.22 2000/10/28 16:20:54 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -220,6 +220,8 @@ static uint32 readOff = 0;
 static char readBuf[BLCKSZ];
 static XLogRecord *nextRecord = NULL;
 
+static bool InRedo = false;
+
 XLogRecPtr
 XLogInsert(RmgrId rmid, uint8 info, char *hdr, uint32 hdrlen, char *buf, uint32 buflen)
 {
@@ -481,6 +483,19 @@ XLogFlush(XLogRecPtr record)
        unsigned        i = 0;
        bool            force_lgwr = false;
 
+       if (XLOG_DEBUG)
+       {
+               fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n",
+                       (IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
+                       (InRedo) ? "(redo)" : "",
+                       record.xlogid, record.xrecoff,
+                       LgwrResult.Write.xlogid, LgwrResult.Write.xrecoff,
+                       LgwrResult.Flush.xlogid, LgwrResult.Flush.xrecoff);
+               fflush(stderr);
+       }
+
+       if (IsBootstrapProcessingMode() || InRedo)
+               return;
        if (XLByteLE(record, LgwrResult.Flush))
                return;
        WriteRqst = LgwrRqst.Write;
@@ -894,7 +909,7 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
        record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
 
 got_record:;
-       if (record->xl_len == 0 || record->xl_len >
+       if (record->xl_len >
                (BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord))
        {
                elog(emode, "ReadRecord: invalid record len %u in (%u, %u)",
@@ -1259,7 +1274,6 @@ StartupXLOG()
                                LastRec;
        XLogRecord *record;
        char            buffer[MAXLOGRECSZ + SizeOfXLogRecord];
-       int                     recovery = 0;
        bool            sie_saved = false;
 
 #endif
@@ -1380,16 +1394,15 @@ StartupXLOG()
                        elog(STOP, "Invalid Redo/Undo record in shutdown checkpoint");
                if (ControlFile->state == DB_SHUTDOWNED)
                        elog(STOP, "Invalid Redo/Undo record in Shutdowned state");
-               recovery = 1;
+               InRecovery = true;
        }
        else if (ControlFile->state != DB_SHUTDOWNED)
        {
-               if (checkPoint.Shutdown)
-                       elog(STOP, "Invalid state in control file");
-               recovery = 1;
+               InRecovery = true;
        }
 
-       if (recovery)
+       /* REDO */
+       if (InRecovery)
        {
                elog(LOG, "The DataBase system was not properly shut down\n"
                         "\tAutomatic recovery is in progress...");
@@ -1401,6 +1414,7 @@ StartupXLOG()
                StopIfError = true;
 
                XLogOpenLogRelation();  /* open pg_log */
+               XLogInitRelationCache();
 
                /* Is REDO required ? */
                if (XLByteLT(checkPoint.redo, RecPtr))
@@ -1409,9 +1423,9 @@ StartupXLOG()
 /* read past CheckPoint record */
                        record = ReadRecord(NULL, buffer);
 
-               /* REDO */
                if (record->xl_len != 0)
                {
+                       InRedo = true;
                        elog(LOG, "Redo starts at (%u, %u)",
                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
                        do
@@ -1441,12 +1455,40 @@ StartupXLOG()
                        elog(LOG, "Redo done at (%u, %u)",
                                 ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
                        LastRec = ReadRecPtr;
+                       InRedo = false;
                }
                else
                        elog(LOG, "Redo is not required");
+       }
+
+       /* Init xlog buffer cache */
+       record = ReadRecord(&LastRec, buffer);
+       logId = EndRecPtr.xlogid;
+       logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
+       logOff = 0;
+       logFile = XLogFileOpen(logId, logSeg, false);
+       XLogCtl->xlblocks[0].xlogid = logId;
+       XLogCtl->xlblocks[0].xrecoff =
+               ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
+       Insert = &XLogCtl->Insert;
+       memcpy((char *) (Insert->currpage), readBuf, BLCKSZ);
+       Insert->currpos = ((char *) Insert->currpage) +
+               (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+       Insert->PrevRecord = LastRec;
+
+       LgwrRqst.Write = LgwrRqst.Flush =
+       LgwrResult.Write = LgwrResult.Flush = EndRecPtr;
+
+       XLogCtl->Write.LgwrResult = LgwrResult;
+       Insert->LgwrResult = LgwrResult;
+
+       XLogCtl->LgwrRqst = LgwrRqst;
+       XLogCtl->LgwrResult = LgwrResult;
 
 #ifdef NOT_USED
-               /* UNDO */
+       /* UNDO */
+       if (InRecovery)
+       {
                RecPtr = ReadRecPtr;
                if (XLByteLT(checkPoint.undo, RecPtr))
                {
@@ -1465,29 +1507,16 @@ StartupXLOG()
                }
                else
                        elog(LOG, "Undo is not required");
-#endif
        }
+#endif
 
-       /* Init xlog buffer cache */
-       record = ReadRecord(&LastRec, buffer);
-       logId = EndRecPtr.xlogid;
-       logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
-       logOff = 0;
-       logFile = XLogFileOpen(logId, logSeg, false);
-       XLogCtl->xlblocks[0].xlogid = logId;
-       XLogCtl->xlblocks[0].xrecoff =
-               ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
-       Insert = &XLogCtl->Insert;
-       memcpy((char *) (Insert->currpage), readBuf, BLCKSZ);
-       Insert->currpos = ((char *) Insert->currpage) +
-               (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
-       Insert->PrevRecord = ControlFile->checkPoint;
-
-       if (recovery)
+       if (InRecovery)
        {
                CreateCheckPoint(true);
                StopIfError = sie_saved;
+               XLogCloseRelationCache();
        }
+       InRecovery = false;
 
 #endif  /* XLOG */
 
index 2800ff0316f43cf33131f0c4a6ef9fb46f29e4ff..3d15033b940298607fefcc7450cbddebf03a6c8c 100644 (file)
@@ -22,6 +22,7 @@
 #include "access/htup.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_database.h"
+#include "lib/hasht.h"
 
 /*
  * ---------------------------------------------------------------
@@ -240,32 +241,10 @@ static int                                        _xlcnt = 0;
 #define        _XLOG_INITRELCACHESIZE  32
 #define        _XLOG_MAXRELCACHESIZE   512
 
-void
-XLogCloseRelationCache(void)
-{
-       int i;
-
-       if (!_xlrelarr)
-               return;
-
-       for (i = 1; i < _xlast; i++)
-       {
-               Relation        reln = &(_xlrelarr[i].reldata);
-               if (reln->rd_fd >= 0)
-                       smgrclose(DEFAULT_SMGR, reln);
-       }
-
-       free(_xlrelarr);
-       free(_xlpgcarr);
-
-       hash_destroy(_xlrelcache);
-       _xlrelarr = NULL;
-}
-
 static void
 _xl_init_rel_cache(void)
 {
-       HASHCTL ctl;
+       HASHCTL                 ctl;
 
        _xlcnt = _XLOG_INITRELCACHESIZE;
        _xlast = 0;
@@ -286,6 +265,35 @@ _xl_init_rel_cache(void)
                                                                HASH_ELEM | HASH_FUNCTION);
 }
 
+static void
+_xl_remove_hash_entry(XLogRelDesc **edata, int dummy)
+{
+       XLogRelCacheEntry          *hentry;
+       bool                                    found;
+       XLogRelDesc                        *rdesc = *edata;
+       Form_pg_class                   tpgc = rdesc->reldata.rd_rel;
+
+       rdesc->lessRecently->moreRecently = rdesc->moreRecently;
+       rdesc->moreRecently->lessRecently = rdesc->lessRecently;
+
+       hentry = (XLogRelCacheEntry*) hash_search(_xlrelcache, 
+               (char*)&(rdesc->reldata.rd_node), HASH_REMOVE, &found);
+
+       if (hentry == NULL)
+               elog(STOP, "_xl_remove_hash_entry: can't delete from cache");
+       if (!found)
+               elog(STOP, "_xl_remove_hash_entry: file was not found in cache");
+
+       if (rdesc->reldata.rd_fd >= 0)
+               smgrclose(DEFAULT_SMGR, &(rdesc->reldata));
+
+       memset(rdesc, 0, sizeof(XLogRelDesc));
+       memset(tpgc, 0, sizeof(FormData_pg_class));
+       rdesc->reldata.rd_rel = tpgc;
+
+       return;
+}
+
 static XLogRelDesc*
 _xl_new_reldesc(void)
 {
@@ -310,32 +318,41 @@ _xl_new_reldesc(void)
        }
        else /* reuse */
        {
-               XLogRelCacheEntry          *hentry;
-               bool                                    found;
-               XLogRelDesc                        *res = _xlrelarr[0].moreRecently;
-               Form_pg_class                   tpgc = res->reldata.rd_rel;
+               XLogRelDesc        *res = _xlrelarr[0].moreRecently;
 
-               res->lessRecently->moreRecently = res->moreRecently;
-               res->moreRecently->lessRecently = res->lessRecently;
+               _xl_remove_hash_entry(&res, 0);
 
-               hentry = (XLogRelCacheEntry*) hash_search(_xlrelcache, 
-                       (char*)&(res->reldata.rd_node), HASH_REMOVE, &found);
+               _xlast--;
+               return(res);
+       }
+}
 
-               if (hentry == NULL)
-                       elog(STOP, "XLogOpenRelation: can't delete from cache");
-               if (!found)
-                       elog(STOP, "XLogOpenRelation: file was not found in cache");
+extern void CreateDummyCaches(void);
+extern void DestroyDummyCaches(void);
 
-               if (res->reldata.rd_fd >= 0)
-                       smgrclose(DEFAULT_SMGR, &(res->reldata));
+void
+XLogInitRelationCache(void)
+{
+       CreateDummyCaches();
+       _xl_init_rel_cache();
+}
 
-               memset(res, 0, sizeof(XLogRelDesc));
-               memset(tpgc, 0, sizeof(FormData_pg_class));
-               res->reldata.rd_rel = tpgc;
+void
+XLogCloseRelationCache(void)
+{
 
-               _xlast--;
-               return(res);
-       }
+       DestroyDummyCaches();
+
+       if (!_xlrelarr)
+               return;
+
+       HashTableWalk(_xlrelcache, (HashtFunc)_xl_remove_hash_entry, 0);
+       hash_destroy(_xlrelcache);
+
+       free(_xlrelarr);
+       free(_xlpgcarr);
+
+       _xlrelarr = NULL;
 }
 
 Relation
@@ -345,9 +362,6 @@ XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode)
        XLogRelCacheEntry          *hentry;
        bool                                    found;
 
-       if (!_xlrelarr)
-               _xl_init_rel_cache();
-
        hentry = (XLogRelCacheEntry*) 
                        hash_search(_xlrelcache, (char*)&rnode, HASH_FIND, &found);
 
index d68033d897526b643cb3207e655523c482a8f74a..802e68670153a870d49be0acd97d278704789d4f 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.62 2000/10/22 17:55:36 pjw Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/dbcommands.c,v 1.63 2000/10/28 16:20:54 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -193,6 +193,9 @@ createdb(const char *dbname, const char *dbpath, int encoding)
                        elog(ERROR, "CREATE DATABASE: Could not initialize database directory. Delete failed as well");
        }
 
+#ifdef XLOG
+       BufferSync();
+#endif
 }
 
 
index 0905f60b8077f9b652f85d671973a9814de07d9a..3976cb1ab50cf82ffebe8cfbb9b48f7bea4de0fd 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.170 2000/10/24 09:56:15 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.171 2000/10/28 16:20:54 vadim Exp $
  *
 
  *-------------------------------------------------------------------------
@@ -1787,7 +1787,9 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
 
        if (num_moved > 0)
        {
-
+#ifdef XLOG
+               RecordTransactionCommit();
+#else
                /*
                 * We have to commit our tuple' movings before we'll truncate
                 * relation, but we shouldn't lose our locks. And so - quick hack:
@@ -1797,6 +1799,7 @@ failed to add item with len = %u to page %u (free space %u, nusd %u, noff %u)",
                FlushBufferPool();
                TransactionIdCommit(myXID);
                FlushBufferPool();
+#endif
        }
 
        /*
index c0a320986ce9c670756ad66280d59ad654a705bf..9c9bda5035c6685b88bae9a9edfddd9e19b3f4a8 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.91 2000/10/23 04:10:06 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.92 2000/10/28 16:20:55 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
  *             freelist.c -- chooses victim for buffer replacement
  *             buf_table.c -- manages the buffer lookup table
  */
+
+#ifdef XLOG
+
+#include "xlog_bufmgr.c"
+
+#else
+
 #include <sys/types.h>
 #include <sys/file.h>
 #include <math.h>
@@ -2512,3 +2519,5 @@ MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer))
        SpinRelease(BufMgrLock);
        return;
 }
+
+#endif /* ! XLOG */
index 1d6a416e48e2c95aa2cd9c2512de7dc540dce717..faa3304b4f66d9d5fcc97af2419a955761ca37d5 100644 (file)
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.32 2000/10/23 04:10:06 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.33 2000/10/28 16:20:56 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
+   
+#ifdef XLOG
+
+#include "xlog_localbuf.c"
+
+#else
+
 #include <sys/types.h>
 #include <sys/file.h>
 #include <math.h>
@@ -247,10 +254,11 @@ InitLocalBuffer(void)
 }
 
 /*
- * LocalBufferSync -
- *       flush all dirty buffers in the local buffer cache. Since the buffer
- *       cache is only used for keeping relations visible during a transaction,
- *       we will not need these buffers again.
+ * LocalBufferSync
+ *
+ * Flush all dirty buffers in the local buffer cache at commit time.
+ * Since the buffer cache is only used for keeping relations visible
+ * during a transaction, we will not need these buffers again.
  */
 void
 LocalBufferSync(void)
@@ -303,3 +311,5 @@ ResetLocalBufferPool(void)
        MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
        nextFreeLocalBuf = 0;
 }
+
+#endif /* XLOG */
diff --git a/src/backend/storage/buffer/xlog_bufmgr.c b/src/backend/storage/buffer/xlog_bufmgr.c
new file mode 100644 (file)
index 0000000..dcd377b
--- /dev/null
@@ -0,0 +1,2205 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmgr.c
+ *       buffer manager interface routines
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_bufmgr.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+/*
+ *
+ * BufferAlloc() -- lookup a buffer in the buffer table.  If
+ *             it isn't there add it, but do not read data into memory.
+ *             This is used when we are about to reinitialize the
+ *             buffer so don't care what the current disk contents are.
+ *             BufferAlloc() also pins the new buffer in memory.
+ *
+ * ReadBuffer() -- like BufferAlloc() but reads the data
+ *             on a buffer cache miss.
+ *
+ * ReleaseBuffer() -- unpin the buffer
+ *
+ * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty"
+ *             but don't unpin.  The disk IO is delayed until buffer
+ *             replacement.
+ *
+ * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
+ *
+ * BufferSync() -- flush all dirty buffers in the buffer pool.
+ *
+ * InitBufferPool() -- Init the buffer module.
+ *
+ * See other files:
+ *             freelist.c -- chooses victim for buffer replacement
+ *             buf_table.c -- manages the buffer lookup table
+ */
+#include <sys/types.h>
+#include <sys/file.h>
+#include <math.h>
+#include <signal.h>
+
+#include "postgres.h"
+#include "executor/execdebug.h"
+#include "miscadmin.h"
+#include "storage/s_lock.h"
+#include "storage/smgr.h"
+#include "utils/relcache.h"
+
+#ifdef XLOG
+#include "catalog/pg_database.h"
+#endif
+
+#define BufferGetLSN(bufHdr)   \
+       (*((XLogRecPtr*)MAKE_PTR((bufHdr)->data)))
+
+
+extern SPINLOCK BufMgrLock;
+extern long int ReadBufferCount;
+extern long int ReadLocalBufferCount;
+extern long int BufferHitCount;
+extern long int LocalBufferHitCount;
+extern long int BufferFlushCount;
+extern long int LocalBufferFlushCount;
+
+/*
+ * It's used to avoid disk writes for read-only transactions
+ * (i.e. when no one shared buffer was changed by transaction).
+ * We set it to true in WriteBuffer/WriteNoReleaseBuffer when
+ * marking shared buffer as dirty. We set it to false in xact.c
+ * after transaction is committed/aborted.
+ */
+bool           SharedBufferChanged = false;
+
+static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
+static void StartBufferIO(BufferDesc *buf, bool forInput);
+static void TerminateBufferIO(BufferDesc *buf);
+static void ContinueBufferIO(BufferDesc *buf, bool forInput);
+extern void AbortBufferIO(void);
+
+/*
+ * Macro : BUFFER_IS_BROKEN
+ *             Note that write error doesn't mean the buffer broken
+*/
+#define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY))
+
+#ifndef HAS_TEST_AND_SET
+static void SignalIO(BufferDesc *buf);
+extern long *NWaitIOBackendP;  /* defined in buf_init.c */
+
+#endif  /* HAS_TEST_AND_SET */
+
+static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
+                                                bool bufferLockHeld);
+static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
+                       bool *foundPtr, bool bufferLockHeld);
+static int     BufferReplace(BufferDesc *bufHdr);
+void           PrintBufferDescs(void);
+
+/* ---------------------------------------------------
+ * RelationGetBufferWithBuffer
+ *             see if the given buffer is what we want
+ *             if yes, we don't need to bother the buffer manager
+ * ---------------------------------------------------
+ */
+Buffer
+RelationGetBufferWithBuffer(Relation relation,
+                                                       BlockNumber blockNumber,
+                                                       Buffer buffer)
+{
+       BufferDesc *bufHdr;
+
+       if (BufferIsValid(buffer))
+       {
+               if (!BufferIsLocal(buffer))
+               {
+                       bufHdr = &BufferDescriptors[buffer - 1];
+                       SpinAcquire(BufMgrLock);
+                       if (bufHdr->tag.blockNum == blockNumber &&
+                               RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
+                       {
+                               SpinRelease(BufMgrLock);
+                               return buffer;
+                       }
+                       return ReadBufferWithBufferLock(relation, blockNumber, true);
+               }
+               else
+               {
+                       bufHdr = &LocalBufferDescriptors[-buffer - 1];
+                       if (bufHdr->tag.blockNum == blockNumber &&
+                               RelFileNodeEquals(bufHdr->tag.rnode, relation->rd_node))
+                               return buffer;
+               }
+       }
+       return ReadBuffer(relation, blockNumber);
+}
+
+/*
+ * ReadBuffer -- returns a buffer containing the requested
+ *             block of the requested relation.  If the blknum
+ *             requested is P_NEW, extend the relation file and
+ *             allocate a new block.
+ *
+ * Returns: the buffer number for the buffer containing
+ *             the block read or NULL on an error.
+ *
+ * Assume when this function is called, that reln has been
+ *             opened already.
+ */
+
+#undef ReadBuffer                              /* conflicts with macro when BUFMGR_DEBUG
+                                                                * defined */
+
+/*
+ * ReadBuffer
+ *
+ */
+Buffer
+ReadBuffer(Relation reln, BlockNumber blockNum)
+{
+       return ReadBufferWithBufferLock(reln, blockNum, false);
+}
+
+/*
+ * ReadBufferWithBufferLock -- does the work of
+ *             ReadBuffer() but with the possibility that
+ *             the buffer lock has already been held. this
+ *             is yet another effort to reduce the number of
+ *             semops in the system.
+ */
+static Buffer
+ReadBufferWithBufferLock(Relation reln,
+                                                BlockNumber blockNum,
+                                                bool bufferLockHeld)
+{
+       BufferDesc *bufHdr;
+       int                     extend;                 /* extending the file by one block */
+       int                     status;
+       bool            found;
+       bool            isLocalBuf;
+
+       extend = (blockNum == P_NEW);
+       isLocalBuf = reln->rd_myxactonly;
+
+       if (isLocalBuf)
+       {
+               ReadLocalBufferCount++;
+               bufHdr = LocalBufferAlloc(reln, blockNum, &found);
+               if (found)
+                       LocalBufferHitCount++;
+       }
+       else
+       {
+               ReadBufferCount++;
+
+               /*
+                * lookup the buffer.  IO_IN_PROGRESS is set if the requested
+                * block is not currently in memory.
+                */
+               bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
+               if (found)
+                       BufferHitCount++;
+       }
+
+       if (!bufHdr)
+               return InvalidBuffer;
+
+       /* if it's already in the buffer pool, we're done */
+       if (found)
+       {
+
+               /*
+                * This happens when a bogus buffer was returned previously and is
+                * floating around in the buffer pool.  A routine calling this
+                * would want this extended.
+                */
+               if (extend)
+               {
+                       /* new buffers are zero-filled */
+                       MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
+                       smgrextend(DEFAULT_SMGR, reln,
+                                          (char *) MAKE_PTR(bufHdr->data));
+               }
+               return BufferDescriptorGetBuffer(bufHdr);
+
+       }
+
+       /*
+        * if we have gotten to this point, the reln pointer must be ok and
+        * the relation file must be open.
+        */
+       if (extend)
+       {
+               /* new buffers are zero-filled */
+               MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
+               status = smgrextend(DEFAULT_SMGR, reln,
+                                                       (char *) MAKE_PTR(bufHdr->data));
+       }
+       else
+       {
+               status = smgrread(DEFAULT_SMGR, reln, blockNum,
+                                                 (char *) MAKE_PTR(bufHdr->data));
+       }
+
+       if (isLocalBuf)
+               return BufferDescriptorGetBuffer(bufHdr);
+
+       /* lock buffer manager again to update IO IN PROGRESS */
+       SpinAcquire(BufMgrLock);
+
+       if (status == SM_FAIL)
+       {
+               /* IO Failed.  cleanup the data structures and go home */
+
+               if (!BufTableDelete(bufHdr))
+               {
+                       SpinRelease(BufMgrLock);
+                       elog(FATAL, "BufRead: buffer table broken after IO error\n");
+               }
+               /* remember that BufferAlloc() pinned the buffer */
+               UnpinBuffer(bufHdr);
+
+               /*
+                * Have to reset the flag so that anyone waiting for the buffer
+                * can tell that the contents are invalid.
+                */
+               bufHdr->flags |= BM_IO_ERROR;
+               bufHdr->flags &= ~BM_IO_IN_PROGRESS;
+       }
+       else
+       {
+               /* IO Succeeded.  clear the flags, finish buffer update */
+
+               bufHdr->flags &= ~(BM_IO_ERROR | BM_IO_IN_PROGRESS);
+       }
+
+       /* If anyone was waiting for IO to complete, wake them up now */
+       TerminateBufferIO(bufHdr);
+
+       SpinRelease(BufMgrLock);
+
+       if (status == SM_FAIL)
+               return InvalidBuffer;
+
+       return BufferDescriptorGetBuffer(bufHdr);
+}
+
+/*
+ * BufferAlloc -- Get a buffer from the buffer pool but dont
+ *             read it.
+ *
+ * Returns: descriptor for buffer
+ *
+ * When this routine returns, the BufMgrLock is guaranteed NOT be held.
+ */
+static BufferDesc *
+BufferAlloc(Relation reln,
+                       BlockNumber blockNum,
+                       bool *foundPtr,
+                       bool bufferLockHeld)
+{
+       BufferDesc *buf,
+                          *buf2;
+       BufferTag       newTag;                 /* identity of requested block */
+       bool            inProgress;             /* buffer undergoing IO */
+       bool            newblock = FALSE;
+
+       /* create a new tag so we can lookup the buffer */
+       /* assume that the relation is already open */
+       if (blockNum == P_NEW)
+       {
+               newblock = TRUE;
+               blockNum = smgrnblocks(DEFAULT_SMGR, reln);
+       }
+
+       INIT_BUFFERTAG(&newTag, reln, blockNum);
+
+       if (!bufferLockHeld)
+               SpinAcquire(BufMgrLock);
+
+       /* see if the block is in the buffer pool already */
+       buf = BufTableLookup(&newTag);
+       if (buf != NULL)
+       {
+
+               /*
+                * Found it.  Now, (a) pin the buffer so no one steals it from the
+                * buffer pool, (b) check IO_IN_PROGRESS, someone may be faulting
+                * the buffer into the buffer pool.
+                */
+
+               PinBuffer(buf);
+               inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+
+               *foundPtr = TRUE;
+               if (inProgress)                 /* confirm end of IO */
+               {
+                       WaitIO(buf, BufMgrLock);
+                       inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+               }
+               if (BUFFER_IS_BROKEN(buf))
+               {
+
+                       /*
+                        * I couldn't understand the following old comment. If there's
+                        * no IO for the buffer and the buffer is BROKEN,it should be
+                        * read again. So start a new buffer IO here.
+                        *
+                        * wierd race condition:
+                        *
+                        * We were waiting for someone else to read the buffer. While we
+                        * were waiting, the reader boof'd in some way, so the
+                        * contents of the buffer are still invalid.  By saying that
+                        * we didn't find it, we can make the caller reinitialize the
+                        * buffer.      If two processes are waiting for this block, both
+                        * will read the block.  The second one to finish may
+                        * overwrite any updates made by the first.  (Assume higher
+                        * level synchronization prevents this from happening).
+                        *
+                        * This is never going to happen, don't worry about it.
+                        */
+                       *foundPtr = FALSE;
+               }
+#ifdef BMTRACE
+               _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCFND);
+#endif  /* BMTRACE */
+
+               if (!(*foundPtr))
+                       StartBufferIO(buf, true);
+               SpinRelease(BufMgrLock);
+
+               return buf;
+       }
+
+       *foundPtr = FALSE;
+
+       /*
+        * Didn't find it in the buffer pool.  We'll have to initialize a new
+        * buffer.      First, grab one from the free list.  If it's dirty, flush
+        * it to disk. Remember to unlock BufMgr spinlock while doing the IOs.
+        */
+       inProgress = FALSE;
+       for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
+       {
+               buf = GetFreeBuffer();
+
+               /* GetFreeBuffer will abort if it can't find a free buffer */
+               Assert(buf);
+
+               /*
+                * There should be exactly one pin on the buffer after it is
+                * allocated -- ours.  If it had a pin it wouldn't have been on
+                * the free list.  No one else could have pinned it between
+                * GetFreeBuffer and here because we have the BufMgrLock.
+                */
+               Assert(buf->refcount == 0);
+               buf->refcount = 1;
+               PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;
+
+               if (buf->flags & BM_DIRTY || buf->cntxDirty)
+               {
+                       bool            smok;
+
+                       /*
+                        *      skip write error buffers 
+                        */
+                       if ((buf->flags & BM_IO_ERROR) != 0)
+                       {
+                               PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+                               buf->refcount--;
+                               buf = (BufferDesc *) NULL;
+                               continue;
+                       }
+                       /*
+                        * Set BM_IO_IN_PROGRESS to keep anyone from doing anything
+                        * with the contents of the buffer while we write it out. We
+                        * don't really care if they try to read it, but if they can
+                        * complete a BufferAlloc on it they can then scribble into
+                        * it, and we'd really like to avoid that while we are
+                        * flushing the buffer.  Setting this flag should block them
+                        * in WaitIO until we're done.
+                        */
+                       inProgress = TRUE;
+
+                       /*
+                        * All code paths that acquire this lock pin the buffer first;
+                        * since no one had it pinned (it just came off the free
+                        * list), no one else can have this lock.
+                        */
+                       StartBufferIO(buf, false);
+
+                       /*
+                        * Write the buffer out, being careful to release BufMgrLock
+                        * before starting the I/O.
+                        */
+                       smok = BufferReplace(buf);
+
+                       if (smok == FALSE)
+                       {
+                               elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
+                               buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
+                               inProgress = FALSE;
+                               buf->flags |= BM_IO_ERROR;
+                               buf->flags &= ~BM_IO_IN_PROGRESS;
+                               TerminateBufferIO(buf);
+                               PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+                               Assert(buf->refcount > 0);
+                               buf->refcount--;
+                               if (buf->refcount == 0)
+                               {
+                                       AddBufferToFreelist(buf);
+                                       buf->flags |= BM_FREE;
+                               }
+                               buf = (BufferDesc *) NULL;
+                       }
+                       else
+                       {
+                               /*
+                                * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't
+                                * be setted by anyone.         - vadim 01/17/97
+                                */
+                               if (buf->flags & BM_JUST_DIRTIED)
+                               {
+                                       elog(STOP, "BufferAlloc: content of block %u (%s) changed while flushing",
+                                                buf->tag.blockNum, buf->blind.relname);
+                               }
+                               else
+                                       buf->flags &= ~BM_DIRTY;
+                               buf->cntxDirty = false;
+                       }
+
+                       /*
+                        * Somebody could have pinned the buffer while we were doing
+                        * the I/O and had given up the BufMgrLock (though they would
+                        * be waiting for us to clear the BM_IO_IN_PROGRESS flag).
+                        * That's why this is a loop -- if so, we need to clear the
+                        * I/O flags, remove our pin and start all over again.
+                        *
+                        * People may be making buffers free at any time, so there's no
+                        * reason to think that we have an immediate disaster on our
+                        * hands.
+                        */
+                       if (buf && buf->refcount > 1)
+                       {
+                               inProgress = FALSE;
+                               buf->flags &= ~BM_IO_IN_PROGRESS;
+                               TerminateBufferIO(buf);
+                               PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+                               buf->refcount--;
+                               buf = (BufferDesc *) NULL;
+                       }
+
+                       /*
+                        * Somebody could have allocated another buffer for the same
+                        * block we are about to read in. (While we flush out the
+                        * dirty buffer, we don't hold the lock and someone could have
+                        * allocated another buffer for the same block. The problem is
+                        * we haven't gotten around to insert the new tag into the
+                        * buffer table. So we need to check here.              -ay 3/95
+                        */
+                       buf2 = BufTableLookup(&newTag);
+                       if (buf2 != NULL)
+                       {
+
+                               /*
+                                * Found it. Someone has already done what we're about to
+                                * do. We'll just handle this as if it were found in the
+                                * buffer pool in the first place.
+                                */
+                               if (buf != NULL)
+                               {
+                                       buf->flags &= ~BM_IO_IN_PROGRESS;
+                                       TerminateBufferIO(buf);
+                                       /* give up the buffer since we don't need it any more */
+                                       PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
+                                       Assert(buf->refcount > 0);
+                                       buf->refcount--;
+                                       if (buf->refcount == 0)
+                                       {
+                                               AddBufferToFreelist(buf);
+                                               buf->flags |= BM_FREE;
+                                       }
+                               }
+
+                               PinBuffer(buf2);
+                               inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
+
+                               *foundPtr = TRUE;
+                               if (inProgress)
+                               {
+                                       WaitIO(buf2, BufMgrLock);
+                                       inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
+                               }
+                               if (BUFFER_IS_BROKEN(buf2))
+                                       *foundPtr = FALSE;
+
+                               if (!(*foundPtr))
+                                       StartBufferIO(buf2, true);
+                               SpinRelease(BufMgrLock);
+
+                               return buf2;
+                       }
+               }
+       }
+
+       /*
+        * At this point we should have the sole pin on a non-dirty buffer and
+        * we may or may not already have the BM_IO_IN_PROGRESS flag set.
+        */
+
+       /*
+        * Change the name of the buffer in the lookup table:
+        *
+        * Need to update the lookup table before the read starts. If someone
+        * comes along looking for the buffer while we are reading it in, we
+        * don't want them to allocate a new buffer.  For the same reason, we
+        * didn't want to erase the buf table entry for the buffer we were
+        * writing back until now, either.
+        */
+
+       if (!BufTableDelete(buf))
+       {
+               SpinRelease(BufMgrLock);
+               elog(FATAL, "buffer wasn't in the buffer table\n");
+       }
+
+       /* record the database name and relation name for this buffer */
+       strcpy(buf->blind.dbname, (DatabaseName) ? DatabaseName : "Recovery");
+       strcpy(buf->blind.relname, RelationGetPhysicalRelationName(reln));
+
+       INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
+       if (!BufTableInsert(buf))
+       {
+               SpinRelease(BufMgrLock);
+               elog(FATAL, "Buffer in lookup table twice \n");
+       }
+
+       /*
+        * Buffer contents are currently invalid.  Have to mark IO IN PROGRESS
+        * so no one fiddles with them until the read completes.  If this
+        * routine has been called simply to allocate a buffer, no io will be
+        * attempted, so the flag isnt set.
+        */
+       if (!inProgress)
+               StartBufferIO(buf, true);
+       else
+               ContinueBufferIO(buf, true);
+
+#ifdef BMTRACE
+       _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);
+#endif  /* BMTRACE */
+
+       SpinRelease(BufMgrLock);
+
+       return buf;
+}
+
+/*
+ * WriteBuffer
+ *
+ *             Marks buffer contents as dirty (actual write happens later).
+ *
+ * Assume that buffer is pinned.  Assume that reln is
+ *             valid.
+ *
+ * Side Effects:
+ *             Pin count is decremented.
+ */
+
+#undef WriteBuffer
+
+int
+WriteBuffer(Buffer buffer)
+{
+       BufferDesc *bufHdr;
+
+       if (BufferIsLocal(buffer))
+               return WriteLocalBuffer(buffer, TRUE);
+
+       if (BAD_BUFFER_ID(buffer))
+               return FALSE;
+
+       bufHdr = &BufferDescriptors[buffer - 1];
+
+       SharedBufferChanged = true;
+
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+
+       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+       UnpinBuffer(bufHdr);
+       SpinRelease(BufMgrLock);
+
+       return TRUE;
+}
+
+/*
+ * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer
+ *                                                when the operation is complete.
+ */
+int
+WriteNoReleaseBuffer(Buffer buffer)
+{
+       BufferDesc *bufHdr;
+
+       if (BufferIsLocal(buffer))
+               return WriteLocalBuffer(buffer, FALSE);
+
+       if (BAD_BUFFER_ID(buffer))
+               return STATUS_ERROR;
+
+       bufHdr = &BufferDescriptors[buffer - 1];
+
+       SharedBufferChanged = true;
+
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+
+       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+
+       SpinRelease(BufMgrLock);
+
+       return STATUS_OK;
+}
+
+
+#undef ReleaseAndReadBuffer
+/*
+ * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
+ *             so that only one semop needs to be called.
+ *
+ */
+Buffer
+ReleaseAndReadBuffer(Buffer buffer,
+                                        Relation relation,
+                                        BlockNumber blockNum)
+{
+       BufferDesc *bufHdr;
+       Buffer          retbuf;
+
+       if (BufferIsLocal(buffer))
+       {
+               Assert(LocalRefCount[-buffer - 1] > 0);
+               LocalRefCount[-buffer - 1]--;
+       }
+       else
+       {
+               if (BufferIsValid(buffer))
+               {
+                       bufHdr = &BufferDescriptors[buffer - 1];
+                       Assert(PrivateRefCount[buffer - 1] > 0);
+                       PrivateRefCount[buffer - 1]--;
+                       if (PrivateRefCount[buffer - 1] == 0)
+                       {
+                               SpinAcquire(BufMgrLock);
+                               Assert(bufHdr->refcount > 0);
+                               bufHdr->refcount--;
+                               if (bufHdr->refcount == 0)
+                               {
+                                       AddBufferToFreelist(bufHdr);
+                                       bufHdr->flags |= BM_FREE;
+                               }
+                               retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
+                               return retbuf;
+                       }
+               }
+       }
+
+       return ReadBuffer(relation, blockNum);
+}
+
+/*
+ * BufferSync -- Write all dirty buffers in the pool.
+ *
+ * This is called at checkpoint time and write out all dirty buffers.
+ */
+void
+BufferSync()
+{
+       int                     i;
+       BufferDesc *bufHdr;
+       Buffer          buffer;
+       int                     status;
+       RelFileNode     rnode;
+       XLogRecPtr      recptr;
+       Relation        reln = NULL;
+
+       for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
+       {
+
+               SpinAcquire(BufMgrLock);
+
+               if (!(bufHdr->flags & BM_VALID))
+               {
+                       SpinRelease(BufMgrLock);
+                       continue;
+               }
+
+               /*
+                * Pin buffer and ensure that no one reads it from disk
+                */
+               PinBuffer(bufHdr);
+               /* Synchronize with BufferAlloc */
+               if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                       WaitIO(bufHdr, BufMgrLock);
+
+               buffer = BufferDescriptorGetBuffer(bufHdr);
+               rnode = bufHdr->tag.rnode;
+
+               SpinRelease(BufMgrLock);
+
+               /*
+                * Try to find relation for buffer
+                */
+               reln = RelationNodeCacheGetRelation(rnode);
+
+               /*
+                * Protect buffer content against concurrent update
+                */
+               LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+               /*
+                * Force XLOG flush for buffer' LSN
+                */
+               recptr = BufferGetLSN(bufHdr);
+               XLogFlush(recptr);
+
+               /*
+                * Now it's safe to write buffer to disk
+                * (if needed at all -:))
+                */
+
+               SpinAcquire(BufMgrLock);
+               if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                       WaitIO(bufHdr, BufMgrLock);
+
+               if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+               {
+                       bufHdr->flags &= ~BM_JUST_DIRTIED;
+                       StartBufferIO(bufHdr, false);           /* output IO start */
+
+                       SpinRelease(BufMgrLock);
+
+                       if (reln == (Relation) NULL)
+                       {
+                               status = smgrblindwrt(DEFAULT_SMGR,
+                                                                       bufHdr->tag.rnode,
+                                                                       bufHdr->tag.blockNum,
+                                                                       (char *) MAKE_PTR(bufHdr->data),
+                                                                       true);  /* must fsync */
+                       }
+                       else
+                       {
+                               status = smgrwrite(DEFAULT_SMGR, reln,
+                                                               bufHdr->tag.blockNum,
+                                                               (char *) MAKE_PTR(bufHdr->data));
+                       }
+
+                       if (status == SM_FAIL)  /* disk failure ?! */
+                               elog(STOP, "BufferSync: cannot write %u for %s",
+                                        bufHdr->tag.blockNum, bufHdr->blind.relname);
+
+                       /*
+                        * Note that it's safe to change cntxDirty here because of
+                        * we protect it from upper writers by share lock and from
+                        * other bufmgr routines by BM_IO_IN_PROGRESS
+                        */
+                       bufHdr->cntxDirty = false;
+
+                       /*
+                        * Release the per-buffer readlock, reacquire BufMgrLock.
+                        */
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+                       BufferFlushCount++;
+
+                       SpinAcquire(BufMgrLock);
+
+                       bufHdr->flags &= ~BM_IO_IN_PROGRESS;    /* mark IO finished */
+                       TerminateBufferIO(bufHdr);                              /* Sync IO finished */
+
+                       /*
+                        * If this buffer was marked by someone as DIRTY while
+                        * we were flushing it out we must not clear DIRTY
+                        * flag - vadim 01/17/97
+                        */
+                       if (!(bufHdr->flags & BM_JUST_DIRTIED))
+                               bufHdr->flags &= ~BM_DIRTY;
+               }
+               else
+                       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+               UnpinBuffer(bufHdr);
+
+               SpinRelease(BufMgrLock);
+
+               /* drop refcnt obtained by RelationIdCacheGetRelation */
+               if (reln != (Relation) NULL)
+               {
+                       RelationDecrementReferenceCount(reln);
+                       reln = NULL;
+               }
+       }
+
+}
+
+/*
+ * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf' is cleared.
+ *
+ * Should be entered with buffer manager spinlock held; releases it before
+ * waiting and re-acquires it afterwards.
+ *
+ * OLD NOTES:
+ *             Because IO_IN_PROGRESS conflicts are
+ *             expected to be rare, there is only one BufferIO
+ *             lock in the entire system.      All processes block
+ *             on this semaphore when they try to use a buffer
+ *             that someone else is faulting in.  Whenever a
+ *             process finishes an IO and someone is waiting for
+ *             the buffer, BufferIO is signaled (SignalIO).  All
+ *             waiting processes then wake up and check to see
+ *             if their buffer is now ready.  This implementation
+ *             is simple, but efficient enough if WaitIO is
+ *             rarely called by multiple processes simultaneously.
+ *
+ * NEW NOTES:
+ *             The above is true only on machines without test-and-set
+ *             semaphores (which we hope are few, these days).  On better
+ *             hardware, each buffer has a spinlock that we can wait on.
+ */
+#ifdef HAS_TEST_AND_SET
+
+static void
+WaitIO(BufferDesc *buf, SPINLOCK spinlock)
+{
+
+       /*
+        * Changed to wait until there's no IO - Inoue 01/13/2000
+        */
+       while ((buf->flags & BM_IO_IN_PROGRESS) != 0)
+       {
+               SpinRelease(spinlock);
+               S_LOCK(&(buf->io_in_progress_lock));
+               S_UNLOCK(&(buf->io_in_progress_lock));
+               SpinAcquire(spinlock);
+       }
+}
+
+#else                                                  /* !HAS_TEST_AND_SET */
+
+IpcSemaphoreId WaitIOSemId;
+IpcSemaphoreId WaitCLSemId;
+
+static void
+WaitIO(BufferDesc *buf, SPINLOCK spinlock)
+{
+       bool            inProgress;
+
+       for (;;)
+       {
+
+               /* wait until someone releases IO lock */
+               (*NWaitIOBackendP)++;
+               SpinRelease(spinlock);
+               IpcSemaphoreLock(WaitIOSemId, 0, 1);
+               SpinAcquire(spinlock);
+               inProgress = (buf->flags & BM_IO_IN_PROGRESS);
+               if (!inProgress)
+                       break;
+       }
+}
+
+/*
+ * SignalIO
+ */
+static void
+SignalIO(BufferDesc *buf)
+{
+       /* somebody better be waiting. */
+       Assert(buf->refcount > 1);
+       IpcSemaphoreUnlock(WaitIOSemId, 0, *NWaitIOBackendP);
+       *NWaitIOBackendP = 0;
+}
+
+#endif  /* HAS_TEST_AND_SET */
+
+long           NDirectFileRead;        /* some I/O's are direct file access.
+                                                                * bypass bufmgr */
+long           NDirectFileWrite;       /* e.g., I/O in psort and hashjoin.                                     */
+
+void
+PrintBufferUsage(FILE *statfp)
+{
+       float           hitrate;
+       float           localhitrate;
+
+       if (ReadBufferCount == 0)
+               hitrate = 0.0;
+       else
+               hitrate = (float) BufferHitCount *100.0 / ReadBufferCount;
+
+       if (ReadLocalBufferCount == 0)
+               localhitrate = 0.0;
+       else
+               localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount;
+
+       fprintf(statfp, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
+                       ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate);
+       fprintf(statfp, "!\tLocal  blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
+                       ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate);
+       fprintf(statfp, "!\tDirect blocks: %10ld read, %10ld written\n",
+                       NDirectFileRead, NDirectFileWrite);
+}
+
+void
+ResetBufferUsage()
+{
+       BufferHitCount = 0;
+       ReadBufferCount = 0;
+       BufferFlushCount = 0;
+       LocalBufferHitCount = 0;
+       ReadLocalBufferCount = 0;
+       LocalBufferFlushCount = 0;
+       NDirectFileRead = 0;
+       NDirectFileWrite = 0;
+}
+
+/* ----------------------------------------------
+ *             ResetBufferPool
+ *
+ *             This routine is supposed to be called when a transaction aborts.
+ *             it will release all the buffer pins held by the transaction.
+ *             Currently, we also call it during commit if BufferPoolCheckLeak
+ *             detected a problem --- in that case, isCommit is TRUE, and we
+ *             only clean up buffer pin counts.
+ *
+ * During abort, we also forget any pending fsync requests.  Dirtied buffers
+ * will still get written, eventually, but there will be no fsync for them.
+ *
+ * ----------------------------------------------
+ */
+void
+ResetBufferPool(bool isCommit)
+{
+       int                     i;
+
+       for (i = 0; i < NBuffers; i++)
+       {
+               if (PrivateRefCount[i] != 0)
+               {
+                       BufferDesc *buf = &BufferDescriptors[i];
+
+                       SpinAcquire(BufMgrLock);
+                       Assert(buf->refcount > 0);
+                       buf->refcount--;
+                       if (buf->refcount == 0)
+                       {
+                               AddBufferToFreelist(buf);
+                               buf->flags |= BM_FREE;
+                       }
+                       SpinRelease(BufMgrLock);
+               }
+               PrivateRefCount[i] = 0;
+       }
+
+       ResetLocalBufferPool();
+
+       if (!isCommit)
+               smgrabort();
+}
+
+/* -----------------------------------------------
+ *             BufferPoolCheckLeak
+ *
+ *             check if there is buffer leak
+ *
+ * -----------------------------------------------
+ */
+int
+BufferPoolCheckLeak()
+{
+       int                     i;
+       int                     result = 0;
+
+       for (i = 1; i <= NBuffers; i++)
+       {
+               if (PrivateRefCount[i - 1] != 0)
+               {
+                       BufferDesc *buf = &(BufferDescriptors[i - 1]);
+
+                       elog(NOTICE,
+                                "Buffer Leak: [%03d] (freeNext=%ld, freePrev=%ld, \
+relname=%s, blockNum=%d, flags=0x%x, refcount=%d %ld)",
+                                i - 1, buf->freeNext, buf->freePrev,
+                                buf->blind.relname, buf->tag.blockNum, buf->flags,
+                                buf->refcount, PrivateRefCount[i - 1]);
+                       result = 1;
+               }
+       }
+       return result;
+}
+
+/* ------------------------------------------------
+ * FlushBufferPool
+ *
+ * Flush all dirty blocks in buffer pool to disk
+ * at the checkpoint time
+ * ------------------------------------------------
+ */
+void
+FlushBufferPool(void)
+{
+       BufferSync();
+       smgrsync();
+}
+
+/*
+ * At the commit time we have to flush local buffer pool only
+ */
+void
+BufmgrCommit(void)
+{
+       LocalBufferSync();
+       smgrcommit();
+}
+
+/*
+ * BufferGetBlockNumber
+ *             Returns the block number associated with a buffer.
+ *
+ * Note:
+ *             Assumes that the buffer is valid.
+ */
+BlockNumber
+BufferGetBlockNumber(Buffer buffer)
+{
+       Assert(BufferIsValid(buffer));
+
+       /* XXX should be a critical section */
+       if (BufferIsLocal(buffer))
+               return LocalBufferDescriptors[-buffer - 1].tag.blockNum;
+       else
+               return BufferDescriptors[buffer - 1].tag.blockNum;
+}
+
+/*
+ * BufferReplace
+ *
+ * Write out the buffer corresponding to 'bufHdr'
+ *
+ * BufMgrLock must be held at entry, and the buffer must be pinned.
+ */
+static int
+BufferReplace(BufferDesc *bufHdr)
+{
+       Relation        reln;
+       XLogRecPtr      recptr;
+       int                     status;
+
+       /* To check if block content changed while flushing. - vadim 01/17/97 */
+       bufHdr->flags &= ~BM_JUST_DIRTIED;
+
+       SpinRelease(BufMgrLock);
+
+       /*
+        * No need to lock buffer context - no one should be able to
+        * end ReadBuffer
+        */
+       recptr = BufferGetLSN(bufHdr);
+       XLogFlush(recptr);
+
+       reln = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
+
+       if (reln != (Relation) NULL)
+       {
+               status = smgrwrite(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+                                                  (char *) MAKE_PTR(bufHdr->data));
+       }
+       else
+       {
+               status = smgrblindwrt(DEFAULT_SMGR, bufHdr->tag.rnode,
+                                                         bufHdr->tag.blockNum,
+                                                         (char *) MAKE_PTR(bufHdr->data),
+                                                         false);       /* no fsync */
+       }
+
+       /* drop relcache refcnt incremented by RelationIdCacheGetRelation */
+       if (reln != (Relation) NULL)
+               RelationDecrementReferenceCount(reln);
+
+       SpinAcquire(BufMgrLock);
+
+       if (status == SM_FAIL)
+               return FALSE;
+
+       BufferFlushCount++;
+
+       return TRUE;
+}
+
+/*
+ * RelationGetNumberOfBlocks
+ *             Returns the buffer descriptor associated with a page in a relation.
+ *
+ * Note:
+ *             XXX may fail for huge relations.
+ *             XXX should be elsewhere.
+ *             XXX maybe should be hidden
+ */
+BlockNumber
+RelationGetNumberOfBlocks(Relation relation)
+{
+       return ((relation->rd_myxactonly) ? relation->rd_nblocks :
+                       smgrnblocks(DEFAULT_SMGR, relation));
+}
+
+/* ---------------------------------------------------------------------
+ *             ReleaseRelationBuffers
+ *
+ *             This function removes all the buffered pages for a relation
+ *             from the buffer pool.  Dirty pages are simply dropped, without
+ *             bothering to write them out first.  This is used when the
+ *             relation is about to be deleted.  We assume that the caller
+ *             holds an exclusive lock on the relation, which should assure
+ *             that no new buffers will be acquired for the rel meanwhile.
+ *
+ *             XXX currently it sequentially searches the buffer pool, should be
+ *             changed to more clever ways of searching.
+ * --------------------------------------------------------------------
+ */
+void
+ReleaseRelationBuffers(Relation rel)
+{
+       int                     i;
+       BufferDesc *bufHdr;
+
+       if (rel->rd_myxactonly)
+       {
+               for (i = 0; i < NLocBuffer; i++)
+               {
+                       bufHdr = &LocalBufferDescriptors[i];
+                       if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+                       {
+                               bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+                               bufHdr->cntxDirty = false;
+                               LocalRefCount[i] = 0;
+                               bufHdr->tag.rnode.relNode = InvalidOid;
+                       }
+               }
+               return;
+       }
+
+       SpinAcquire(BufMgrLock);
+       for (i = 1; i <= NBuffers; i++)
+       {
+               bufHdr = &BufferDescriptors[i - 1];
+recheck:
+               if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+               {
+
+                       /*
+                        * If there is I/O in progress, better wait till it's done;
+                        * don't want to delete the relation out from under someone
+                        * who's just trying to flush the buffer!
+                        */
+                       if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                       {
+                               WaitIO(bufHdr, BufMgrLock);
+
+                               /*
+                                * By now, the buffer very possibly belongs to some other
+                                * rel, so check again before proceeding.
+                                */
+                               goto recheck;
+                       }
+                       /* Now we can do what we came for */
+                       bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+                       bufHdr->cntxDirty = false;
+
+                       /*
+                        * Release any refcount we may have.
+                        *
+                        * This is very probably dead code, and if it isn't then it's
+                        * probably wrong.      I added the Assert to find out --- tgl
+                        * 11/99.
+                        */
+                       if (!(bufHdr->flags & BM_FREE))
+                       {
+                               /* Assert checks that buffer will actually get freed! */
+                               Assert(PrivateRefCount[i - 1] == 1 &&
+                                          bufHdr->refcount == 1);
+                               /* ReleaseBuffer expects we do not hold the lock at entry */
+                               SpinRelease(BufMgrLock);
+                               ReleaseBuffer(i);
+                               SpinAcquire(BufMgrLock);
+                       }
+                       /*
+                        * And mark the buffer as no longer occupied by this rel.
+                        */
+                       BufTableDelete(bufHdr);
+               }
+       }
+
+       SpinRelease(BufMgrLock);
+}
+
+/* ---------------------------------------------------------------------
+ *             DropBuffers
+ *
+ *             This function removes all the buffers in the buffer cache for a
+ *             particular database.  Dirty pages are simply dropped, without
+ *             bothering to write them out first.  This is used when we destroy a
+ *             database, to avoid trying to flush data to disk when the directory
+ *             tree no longer exists.  Implementation is pretty similar to
+ *             ReleaseRelationBuffers() which is for destroying just one relation.
+ * --------------------------------------------------------------------
+ */
+void
+DropBuffers(Oid dbid)
+{
+       int                     i;
+       BufferDesc *bufHdr;
+
+       SpinAcquire(BufMgrLock);
+       for (i = 1; i <= NBuffers; i++)
+       {
+               bufHdr = &BufferDescriptors[i - 1];
+recheck:
+               /*
+                * We know that currently database OID is tblNode but
+                * this probably will be changed in future and this
+                * func will be used to drop tablespace buffers.
+                */
+               if (bufHdr->tag.rnode.tblNode == dbid)
+               {
+
+                       /*
+                        * If there is I/O in progress, better wait till it's done;
+                        * don't want to delete the database out from under someone
+                        * who's just trying to flush the buffer!
+                        */
+                       if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                       {
+                               WaitIO(bufHdr, BufMgrLock);
+
+                               /*
+                                * By now, the buffer very possibly belongs to some other
+                                * DB, so check again before proceeding.
+                                */
+                               goto recheck;
+                       }
+                       /* Now we can do what we came for */
+                       bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+                       bufHdr->cntxDirty = false;
+
+                       /*
+                        * The thing should be free, if caller has checked that no
+                        * backends are running in that database.
+                        */
+                       Assert(bufHdr->flags & BM_FREE);
+                       /*
+                        * And mark the buffer as no longer occupied by this page.
+                        */
+                       BufTableDelete(bufHdr);
+               }
+       }
+       SpinRelease(BufMgrLock);
+}
+
+/* -----------------------------------------------------------------
+ *             PrintBufferDescs
+ *
+ *             this function prints all the buffer descriptors, for debugging
+ *             use only.
+ * -----------------------------------------------------------------
+ */
+void
+PrintBufferDescs()
+{
+       int                     i;
+       BufferDesc *buf = BufferDescriptors;
+
+       if (IsUnderPostmaster)
+       {
+               SpinAcquire(BufMgrLock);
+               for (i = 0; i < NBuffers; ++i, ++buf)
+               {
+                       elog(DEBUG, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
+blockNum=%d, flags=0x%x, refcount=%d %ld)",
+                                i, buf->freeNext, buf->freePrev,
+                                buf->blind.relname, buf->tag.blockNum, buf->flags,
+                                buf->refcount, PrivateRefCount[i]);
+               }
+               SpinRelease(BufMgrLock);
+       }
+       else
+       {
+               /* interactive backend */
+               for (i = 0; i < NBuffers; ++i, ++buf)
+               {
+                       printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
+                                       i, buf->blind.relname, buf->tag.blockNum,
+                                       buf->flags, buf->refcount, PrivateRefCount[i]);
+               }
+       }
+}
+
+void
+PrintPinnedBufs()
+{
+       int                     i;
+       BufferDesc *buf = BufferDescriptors;
+
+       SpinAcquire(BufMgrLock);
+       for (i = 0; i < NBuffers; ++i, ++buf)
+       {
+               if (PrivateRefCount[i] > 0)
+                       elog(NOTICE, "[%02d] (freeNext=%ld, freePrev=%ld, relname=%s, \
+blockNum=%d, flags=0x%x, refcount=%d %ld)\n",
+                                i, buf->freeNext, buf->freePrev, buf->blind.relname,
+                                buf->tag.blockNum, buf->flags,
+                                buf->refcount, PrivateRefCount[i]);
+       }
+       SpinRelease(BufMgrLock);
+}
+
+/*
+ * BufferPoolBlowaway
+ *
+ * this routine is solely for the purpose of experiments -- sometimes
+ * you may want to blowaway whatever is left from the past in buffer
+ * pool and start measuring some performance with a clean empty buffer
+ * pool.
+ */
+#ifdef NOT_USED
+void
+BufferPoolBlowaway()
+{
+       int                     i;
+
+       BufferSync();
+       for (i = 1; i <= NBuffers; i++)
+       {
+               if (BufferIsValid(i))
+               {
+                       while (BufferIsValid(i))
+                               ReleaseBuffer(i);
+               }
+               BufTableDelete(&BufferDescriptors[i - 1]);
+       }
+}
+
+#endif
+
+/* ---------------------------------------------------------------------
+ *             FlushRelationBuffers
+ *
+ *             This function flushes all dirty pages of a relation out to disk.
+ *             Furthermore, pages that have blocknumber >= firstDelBlock are
+ *             actually removed from the buffer pool.  An error code is returned
+ *             if we fail to dump a dirty buffer or if we find one of
+ *             the target pages is pinned into the cache.
+ *
+ *             This is used by VACUUM before truncating the relation to the given
+ *             number of blocks.  (TRUNCATE TABLE also uses it in the same way.)
+ *             It might seem unnecessary to flush dirty pages before firstDelBlock,
+ *             since VACUUM should already have committed its changes.  However,
+ *             it is possible for there still to be dirty pages: if some page
+ *             had unwritten on-row tuple status updates from a prior transaction,
+ *             and VACUUM had no additional changes to make to that page, then
+ *             VACUUM won't have written it.  This is harmless in most cases but
+ *             will break pg_upgrade, which relies on VACUUM to ensure that *all*
+ *             tuples have correct on-row status.  So, we check and flush all
+ *             dirty pages of the rel regardless of block number.
+ *
+ *             This is also used by RENAME TABLE (with firstDelBlock = 0)
+ *             to clear out the buffer cache before renaming the physical files of
+ *             a relation.  Without that, some other backend might try to do a
+ *             blind write of a buffer page (relying on the BlindId of the buffer)
+ *             and fail because it's not got the right filename anymore.
+ *
+ *             In all cases, the caller should be holding AccessExclusiveLock on
+ *             the target relation to ensure that no other backend is busy reading
+ *             more blocks of the relation.
+ *
+ *             Formerly, we considered it an error condition if we found dirty
+ *             buffers here.   However, since BufferSync no longer forces out all
+ *             dirty buffers at every xact commit, it's possible for dirty buffers
+ *             to still be present in the cache due to failure of an earlier
+ *             transaction.  So, must flush dirty buffers without complaint.
+ *
+ *             Returns: 0 - Ok, -1 - FAILED TO WRITE DIRTY BUFFER, -2 - PINNED
+ *
+ *             XXX currently it sequentially searches the buffer pool, should be
+ *             changed to more clever ways of searching.
+ * --------------------------------------------------------------------
+ */
+int
+FlushRelationBuffers(Relation rel, BlockNumber firstDelBlock)
+{
+       int                     i;
+       BufferDesc *bufHdr;
+       XLogRecPtr      recptr;
+       int                     status;
+
+       if (rel->rd_myxactonly)
+       {
+               for (i = 0; i < NLocBuffer; i++)
+               {
+                       bufHdr = &LocalBufferDescriptors[i];
+                       if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+                       {
+                               if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+                               {
+                                       status = smgrwrite(DEFAULT_SMGR, rel, 
+                                                               bufHdr->tag.blockNum,
+                                                               (char *) MAKE_PTR(bufHdr->data));
+                                       if (status == SM_FAIL)
+                                       {
+                                               elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is dirty, could not flush it",
+                                                        RelationGetRelationName(rel), firstDelBlock,
+                                                        bufHdr->tag.blockNum);
+                                               return(-1);
+                                       }
+                                       bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED);
+                                       bufHdr->cntxDirty = false;
+                               }
+                               if (LocalRefCount[i] > 0)
+                               {
+                                       elog(NOTICE, "FlushRelationBuffers(%s (local), %u): block %u is referenced (%ld)",
+                                                RelationGetRelationName(rel), firstDelBlock,
+                                                bufHdr->tag.blockNum, LocalRefCount[i]);
+                                       return(-2);
+                               }
+                               if (bufHdr->tag.blockNum >= firstDelBlock)
+                               {
+                                       bufHdr->tag.rnode.relNode = InvalidOid;
+                               }
+                       }
+               }
+               return 0;
+       }
+
+       SpinAcquire(BufMgrLock);
+       for (i = 0; i < NBuffers; i++)
+       {
+               bufHdr = &BufferDescriptors[i];
+               if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+               {
+                       if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+                       {
+                               PinBuffer(bufHdr);
+                               if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                                       WaitIO(bufHdr, BufMgrLock);
+                               SpinRelease(BufMgrLock);
+
+                               /*
+                                * Force XLOG flush for buffer' LSN
+                                */
+                               recptr = BufferGetLSN(bufHdr);
+                               XLogFlush(recptr);
+
+                               /*
+                                * Now it's safe to write buffer to disk
+                                */
+
+                               SpinAcquire(BufMgrLock);
+                               if (bufHdr->flags & BM_IO_IN_PROGRESS)
+                                       WaitIO(bufHdr, BufMgrLock);
+
+                               if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+                               {
+                                       bufHdr->flags &= ~BM_JUST_DIRTIED;
+                                       StartBufferIO(bufHdr, false);           /* output IO start */
+
+                                       SpinRelease(BufMgrLock);
+
+                                       status = smgrwrite(DEFAULT_SMGR, rel,
+                                                                       bufHdr->tag.blockNum,
+                                                                       (char *) MAKE_PTR(bufHdr->data));
+
+                                       if (status == SM_FAIL)  /* disk failure ?! */
+                                               elog(STOP, "FlushRelationBuffers: cannot write %u for %s",
+                                                        bufHdr->tag.blockNum, bufHdr->blind.relname);
+
+                                       BufferFlushCount++;
+
+                                       SpinAcquire(BufMgrLock);
+                                       bufHdr->flags &= ~BM_IO_IN_PROGRESS;
+                                       TerminateBufferIO(bufHdr);
+                                       Assert(!(bufHdr->flags & BM_JUST_DIRTIED));
+                                       bufHdr->flags &= ~BM_DIRTY;
+                                       /*
+                                        * Note that it's safe to change cntxDirty here because
+                                        * of we protect it from upper writers by
+                                        * AccessExclusiveLock and from other bufmgr routines
+                                        * by BM_IO_IN_PROGRESS
+                                        */
+                                       bufHdr->cntxDirty = false;
+                               }
+                               UnpinBuffer(bufHdr);
+                       }
+                       if (!(bufHdr->flags & BM_FREE))
+                       {
+                               SpinRelease(BufMgrLock);
+                               elog(NOTICE, "FlushRelationBuffers(%s, %u): block %u is referenced (private %ld, global %d)",
+                                        RelationGetRelationName(rel), firstDelBlock,
+                                        bufHdr->tag.blockNum,
+                                        PrivateRefCount[i], bufHdr->refcount);
+                               return -2;
+                       }
+                       if (bufHdr->tag.blockNum >= firstDelBlock)
+                       {
+                               BufTableDelete(bufHdr);
+                       }
+               }
+       }
+       SpinRelease(BufMgrLock);
+       return 0;
+}
+
+#undef ReleaseBuffer
+
+/*
+ * ReleaseBuffer -- remove the pin on a buffer without
+ *             marking it dirty.
+ *
+ */
+int
+ReleaseBuffer(Buffer buffer)
+{
+       BufferDesc *bufHdr;
+
+       if (BufferIsLocal(buffer))
+       {
+               Assert(LocalRefCount[-buffer - 1] > 0);
+               LocalRefCount[-buffer - 1]--;
+               return STATUS_OK;
+       }
+
+       if (BAD_BUFFER_ID(buffer))
+               return STATUS_ERROR;
+
+       bufHdr = &BufferDescriptors[buffer - 1];
+
+       Assert(PrivateRefCount[buffer - 1] > 0);
+       PrivateRefCount[buffer - 1]--;
+       if (PrivateRefCount[buffer - 1] == 0)
+       {
+               SpinAcquire(BufMgrLock);
+               Assert(bufHdr->refcount > 0);
+               bufHdr->refcount--;
+               if (bufHdr->refcount == 0)
+               {
+                       AddBufferToFreelist(bufHdr);
+                       bufHdr->flags |= BM_FREE;
+               }
+               SpinRelease(BufMgrLock);
+       }
+
+       return STATUS_OK;
+}
+
+#ifdef NOT_USED
+void
+IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
+{
+       IncrBufferRefCount(buffer);
+       if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
+       {
+               BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+               fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+                               buffer, buf->blind.relname, buf->tag.blockNum,
+                               PrivateRefCount[buffer - 1], file, line);
+       }
+}
+
+#endif
+
+#ifdef NOT_USED
+void
+ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
+{
+       ReleaseBuffer(buffer);
+       if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
+       {
+               BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+               fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+                               buffer, buf->blind.relname, buf->tag.blockNum,
+                               PrivateRefCount[buffer - 1], file, line);
+       }
+}
+
+#endif
+
+#ifdef NOT_USED
+int
+ReleaseAndReadBuffer_Debug(char *file,
+                                                  int line,
+                                                  Buffer buffer,
+                                                  Relation relation,
+                                                  BlockNumber blockNum)
+{
+       bool            bufferValid;
+       Buffer          b;
+
+       bufferValid = BufferIsValid(buffer);
+       b = ReleaseAndReadBuffer(buffer, relation, blockNum);
+       if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
+               && is_userbuffer(buffer))
+       {
+               BufferDesc *buf = &BufferDescriptors[buffer - 1];
+
+               fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+                               buffer, buf->blind.relname, buf->tag.blockNum,
+                               PrivateRefCount[buffer - 1], file, line);
+       }
+       if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
+       {
+               BufferDesc *buf = &BufferDescriptors[b - 1];
+
+               fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
+refcount = %ld, file: %s, line: %d\n",
+                               b, buf->blind.relname, buf->tag.blockNum,
+                               PrivateRefCount[b - 1], file, line);
+       }
+       return b;
+}
+
+#endif
+
+#ifdef BMTRACE
+
+/*
+ *     trace allocations and deallocations in a circular buffer in
+ *     shared memory.  check the buffer before doing the allocation,
+ *     and die if there's anything fishy.
+ */
+
+_bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType)
+{
+       long            start,
+                               cur;
+       bmtrace    *tb;
+
+       start = *CurTraceBuf;
+
+       if (start > 0)
+               cur = start - 1;
+       else
+               cur = BMT_LIMIT - 1;
+
+       for (;;)
+       {
+               tb = &TraceBuf[cur];
+               if (tb->bmt_op != BMT_NOTUSED)
+               {
+                       if (tb->bmt_buf == bufNo)
+                       {
+                               if ((tb->bmt_op == BMT_DEALLOC)
+                                       || (tb->bmt_dbid == dbId && tb->bmt_relid == relId
+                                               && tb->bmt_blkno == blkNo))
+                                       goto okay;
+
+                               /* die holding the buffer lock */
+                               _bm_die(dbId, relId, blkNo, bufNo, allocType, start, cur);
+                       }
+               }
+
+               if (cur == start)
+                       goto okay;
+
+               if (cur == 0)
+                       cur = BMT_LIMIT - 1;
+               else
+                       cur--;
+       }
+
+okay:
+       tb = &TraceBuf[start];
+       tb->bmt_pid = MyProcPid;
+       tb->bmt_buf = bufNo;
+       tb->bmt_dbid = dbId;
+       tb->bmt_relid = relId;
+       tb->bmt_blkno = blkNo;
+       tb->bmt_op = allocType;
+
+       *CurTraceBuf = (start + 1) % BMT_LIMIT;
+}
+
+_bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
+               int allocType, long start, long cur)
+{
+       FILE       *fp;
+       bmtrace    *tb;
+       int                     i;
+
+       tb = &TraceBuf[cur];
+
+       if ((fp = AllocateFile("/tmp/death_notice", "w")) == NULL)
+               elog(FATAL, "buffer alloc trace error and can't open log file");
+
+       fprintf(fp, "buffer alloc trace detected the following error:\n\n");
+       fprintf(fp, "    buffer %d being %s inconsistently with a previous %s\n\n",
+                bufNo, (allocType == BMT_DEALLOC ? "deallocated" : "allocated"),
+                       (tb->bmt_op == BMT_DEALLOC ? "deallocation" : "allocation"));
+
+       fprintf(fp, "the trace buffer contains:\n");
+
+       i = start;
+       for (;;)
+       {
+               tb = &TraceBuf[i];
+               if (tb->bmt_op != BMT_NOTUSED)
+               {
+                       fprintf(fp, "     [%3d]%spid %d buf %2d for <%d,%u,%d> ",
+                                       i, (i == cur ? " ---> " : "\t"),
+                                       tb->bmt_pid, tb->bmt_buf,
+                                       tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno);
+
+                       switch (tb->bmt_op)
+                       {
+                               case BMT_ALLOCFND:
+                                       fprintf(fp, "allocate (found)\n");
+                                       break;
+
+                               case BMT_ALLOCNOTFND:
+                                       fprintf(fp, "allocate (not found)\n");
+                                       break;
+
+                               case BMT_DEALLOC:
+                                       fprintf(fp, "deallocate\n");
+                                       break;
+
+                               default:
+                                       fprintf(fp, "unknown op type %d\n", tb->bmt_op);
+                                       break;
+                       }
+               }
+
+               i = (i + 1) % BMT_LIMIT;
+               if (i == start)
+                       break;
+       }
+
+       fprintf(fp, "\noperation causing error:\n");
+       fprintf(fp, "\tpid %d buf %d for <%d,%u,%d> ",
+                       getpid(), bufNo, dbId, relId, blkNo);
+
+       switch (allocType)
+       {
+               case BMT_ALLOCFND:
+                       fprintf(fp, "allocate (found)\n");
+                       break;
+
+               case BMT_ALLOCNOTFND:
+                       fprintf(fp, "allocate (not found)\n");
+                       break;
+
+               case BMT_DEALLOC:
+                       fprintf(fp, "deallocate\n");
+                       break;
+
+               default:
+                       fprintf(fp, "unknown op type %d\n", allocType);
+                       break;
+       }
+
+       FreeFile(fp);
+
+       kill(getpid(), SIGILL);
+}
+
+#endif  /* BMTRACE */
+
+/*
+ * SetBufferCommitInfoNeedsSave
+ *
+ *     Mark a buffer dirty when we have updated tuple commit-status bits in it.
+ *
+ * This is similar to WriteNoReleaseBuffer, except that we do not set
+ * SharedBufferChanged or BufferDirtiedByMe, because we have not made a
+ * critical change that has to be flushed to disk before xact commit --- the
+ * status-bit update could be redone by someone else just as easily.  The
+ * buffer will be marked dirty, but it will not be written to disk until
+ * there is another reason to write it.
+ *
+ * This routine might get called many times on the same page, if we are making
+ * the first scan after commit of an xact that added/deleted many tuples.
+ * So, be as quick as we can if the buffer is already dirty.
+ */
+void
+SetBufferCommitInfoNeedsSave(Buffer buffer)
+{
+       BufferDesc *bufHdr;
+
+       if (BufferIsLocal(buffer))
+               return;
+
+       if (BAD_BUFFER_ID(buffer))
+               return;
+
+       bufHdr = &BufferDescriptors[buffer - 1];
+
+       if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) !=
+               (BM_DIRTY | BM_JUST_DIRTIED))
+       {
+               SpinAcquire(BufMgrLock);
+               Assert(bufHdr->refcount > 0);
+               bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+               SpinRelease(BufMgrLock);
+       }
+}
+
+void
+UnlockBuffers()
+{
+       BufferDesc *buf;
+       int                     i;
+
+       for (i = 0; i < NBuffers; i++)
+       {
+               if (BufferLocks[i] == 0)
+                       continue;
+
+               Assert(BufferIsValid(i + 1));
+               buf = &(BufferDescriptors[i]);
+
+#ifdef HAS_TEST_AND_SET
+               S_LOCK(&(buf->cntx_lock));
+#else
+               IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+               if (BufferLocks[i] & BL_R_LOCK)
+               {
+                       Assert(buf->r_locks > 0);
+                       (buf->r_locks)--;
+               }
+               if (BufferLocks[i] & BL_RI_LOCK)
+               {
+
+                       /*
+                        * Someone else could remove our RI lock when acquiring W
+                        * lock. This is possible if we came here from elog(ERROR)
+                        * from IpcSemaphore{Lock|Unlock}(WaitCLSemId). And so we
+                        * don't do Assert(buf->ri_lock) here.
+                        */
+                       buf->ri_lock = false;
+               }
+               if (BufferLocks[i] & BL_W_LOCK)
+               {
+                       Assert(buf->w_lock);
+                       buf->w_lock = false;
+               }
+#ifdef HAS_TEST_AND_SET
+               S_UNLOCK(&(buf->cntx_lock));
+#else
+               IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+               BufferLocks[i] = 0;
+       }
+}
+
+void
+LockBuffer(Buffer buffer, int mode)
+{
+       BufferDesc *buf;
+       bits8      *buflock;
+
+       Assert(BufferIsValid(buffer));
+       if (BufferIsLocal(buffer))
+               return;
+
+       buf = &(BufferDescriptors[buffer - 1]);
+       buflock = &(BufferLocks[buffer - 1]);
+
+#ifdef HAS_TEST_AND_SET
+       S_LOCK(&(buf->cntx_lock));
+#else
+       IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+       if (mode == BUFFER_LOCK_UNLOCK)
+       {
+               if (*buflock & BL_R_LOCK)
+               {
+                       Assert(buf->r_locks > 0);
+                       Assert(!(buf->w_lock));
+                       Assert(!(*buflock & (BL_W_LOCK | BL_RI_LOCK)));
+                       (buf->r_locks)--;
+                       *buflock &= ~BL_R_LOCK;
+               }
+               else if (*buflock & BL_W_LOCK)
+               {
+                       Assert(buf->w_lock);
+                       Assert(buf->r_locks == 0);
+                       Assert(!(*buflock & (BL_R_LOCK | BL_RI_LOCK)));
+                       buf->w_lock = false;
+                       *buflock &= ~BL_W_LOCK;
+               }
+               else
+                       elog(ERROR, "UNLockBuffer: buffer %lu is not locked", buffer);
+       }
+       else if (mode == BUFFER_LOCK_SHARE)
+       {
+               unsigned        i = 0;
+
+               Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
+               while (buf->ri_lock || buf->w_lock)
+               {
+#ifdef HAS_TEST_AND_SET
+                       S_UNLOCK(&(buf->cntx_lock));
+                       s_lock_sleep(i++);
+                       S_LOCK(&(buf->cntx_lock));
+#else
+                       IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+                       s_lock_sleep(i++);
+                       IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+               }
+               (buf->r_locks)++;
+               *buflock |= BL_R_LOCK;
+       }
+       else if (mode == BUFFER_LOCK_EXCLUSIVE)
+       {
+               unsigned        i = 0;
+
+               Assert(!(*buflock & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
+               while (buf->r_locks > 0 || buf->w_lock)
+               {
+                       if (buf->r_locks > 3 || (*buflock & BL_RI_LOCK))
+                       {
+
+                               /*
+                                * Our RI lock might be removed by concurrent W lock
+                                * acquiring (see what we do with RI locks below when our
+                                * own W acquiring succeeded) and so we set RI lock again
+                                * if we already did this.
+                                */
+                               *buflock |= BL_RI_LOCK;
+                               buf->ri_lock = true;
+                       }
+#ifdef HAS_TEST_AND_SET
+                       S_UNLOCK(&(buf->cntx_lock));
+                       s_lock_sleep(i++);
+                       S_LOCK(&(buf->cntx_lock));
+#else
+                       IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+                       s_lock_sleep(i++);
+                       IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+               }
+               buf->w_lock = true;
+               *buflock |= BL_W_LOCK;
+
+               buf->cntxDirty = true;
+
+               if (*buflock & BL_RI_LOCK)
+               {
+
+                       /*
+                        * It's possible to remove RI locks acquired by another W
+                        * lockers here, but they'll take care about it.
+                        */
+                       buf->ri_lock = false;
+                       *buflock &= ~BL_RI_LOCK;
+               }
+       }
+       else
+               elog(ERROR, "LockBuffer: unknown lock mode %d", mode);
+
+#ifdef HAS_TEST_AND_SET
+       S_UNLOCK(&(buf->cntx_lock));
+#else
+       IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
+#endif
+
+}
+
+/*
+ *     Functions for IO error handling
+ *
+ *     Note : We assume that nested buffer IO never occur.
+ *     i.e at most one io_in_progress spinlock is held
+ *     per proc.
+*/
+static BufferDesc *InProgressBuf = (BufferDesc *) NULL;
+static bool IsForInput;
+
+/*
+ * Function:StartBufferIO
+ *     (Assumptions)
+ *     My process is executing no IO
+ *     BufMgrLock is held
+ *     BM_IO_IN_PROGRESS mask is not set for the buffer
+ *     The buffer is Pinned
+ *
+*/
+static void
+StartBufferIO(BufferDesc *buf, bool forInput)
+{
+       Assert(!InProgressBuf);
+       Assert(!(buf->flags & BM_IO_IN_PROGRESS));
+       buf->flags |= BM_IO_IN_PROGRESS;
+#ifdef HAS_TEST_AND_SET
+
+       /*
+        * There used to be
+        *
+        * Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
+        *
+        * here, but that's wrong because of the way WaitIO works: someone else
+        * waiting for the I/O to complete will succeed in grabbing the lock
+        * for a few instructions, and if we context-swap back to here the
+        * Assert could fail.  Tiny window for failure, but I've seen it
+        * happen -- tgl
+        */
+       S_LOCK(&(buf->io_in_progress_lock));
+#endif  /* HAS_TEST_AND_SET */
+       InProgressBuf = buf;
+       IsForInput = forInput;
+}
+
+/*
+ * Function:TerminateBufferIO
+ *     (Assumptions)
+ *     My process is executing IO for the buffer
+ *     BufMgrLock is held
+ *     The buffer is Pinned
+ *
+*/
+static void
+TerminateBufferIO(BufferDesc *buf)
+{
+       Assert(buf == InProgressBuf);
+#ifdef HAS_TEST_AND_SET
+       S_UNLOCK(&(buf->io_in_progress_lock));
+#else
+       if (buf->refcount > 1)
+               SignalIO(buf);
+#endif  /* HAS_TEST_AND_SET */
+       InProgressBuf = (BufferDesc *) 0;
+}
+
+/*
+ * Function:ContinueBufferIO
+ *     (Assumptions)
+ *     My process is executing IO for the buffer
+ *     BufMgrLock is held
+ *     The buffer is Pinned
+ *
+*/
+static void
+ContinueBufferIO(BufferDesc *buf, bool forInput)
+{
+       Assert(buf == InProgressBuf);
+       Assert(buf->flags & BM_IO_IN_PROGRESS);
+       IsForInput = forInput;
+}
+
+#ifdef NOT_USED
+void
+InitBufferIO(void)
+{
+       InProgressBuf = (BufferDesc *) 0;
+}
+#endif
+
+/*
+ *     This function is called from ProcReleaseSpins().
+ *     BufMgrLock isn't held when this function is called.
+ *     BM_IO_ERROR is always set. If BM_IO_ERROR was already
+ *     set in case of output,this routine would kill all
+ *     backends and reset postmaster.
+ */
+void
+AbortBufferIO(void)
+{
+       BufferDesc *buf = InProgressBuf;
+
+       if (buf)
+       {
+               Assert(buf->flags & BM_IO_IN_PROGRESS);
+               SpinAcquire(BufMgrLock);
+               if (IsForInput)
+                       Assert(!(buf->flags & BM_DIRTY) && !(buf->cntxDirty));
+               else
+               {
+                       Assert(buf->flags & BM_DIRTY || buf->cntxDirty);
+                       if (buf->flags & BM_IO_ERROR)
+                       {
+                               elog(NOTICE, "write error may be permanent: cannot write block %u for %s/%s",
+                               buf->tag.blockNum, buf->blind.dbname, buf->blind.relname);
+                       }
+                       buf->flags |= BM_DIRTY;
+               }
+               buf->flags |= BM_IO_ERROR;
+               buf->flags &= ~BM_IO_IN_PROGRESS;
+               TerminateBufferIO(buf);
+               SpinRelease(BufMgrLock);
+       }
+}
+
+/*
+ * Cleanup buffer or mark it for cleanup. Buffer may be cleaned
+ * up if it's pinned only once.
+ *
+ * NOTE: buffer must be excl locked.
+ */
+void
+MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer))
+{
+       BufferDesc *bufHdr = &BufferDescriptors[buffer - 1];
+
+       Assert(PrivateRefCount[buffer - 1] > 0);
+
+       if (PrivateRefCount[buffer - 1] > 1)
+       {
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               PrivateRefCount[buffer - 1]--;
+               SpinAcquire(BufMgrLock);
+               Assert(bufHdr->refcount > 0);
+               bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+               bufHdr->CleanupFunc = CleanupFunc;
+               SpinRelease(BufMgrLock);
+               return;
+       }
+
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+       if (bufHdr->refcount == 1)
+       {
+               SpinRelease(BufMgrLock);
+               CleanupFunc(buffer);
+               CleanupFunc = NULL;
+       }
+       else
+               SpinRelease(BufMgrLock);
+
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       PrivateRefCount[buffer - 1]--;
+
+       SpinAcquire(BufMgrLock);
+       Assert(bufHdr->refcount > 0);
+       bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
+       bufHdr->CleanupFunc = CleanupFunc;
+       bufHdr->refcount--;
+       if (bufHdr->refcount == 0)
+       {
+               AddBufferToFreelist(bufHdr);
+               bufHdr->flags |= BM_FREE;
+       }
+       SpinRelease(BufMgrLock);
+       return;
+}
diff --git a/src/backend/storage/buffer/xlog_localbuf.c b/src/backend/storage/buffer/xlog_localbuf.c
new file mode 100644 (file)
index 0000000..cb14a32
--- /dev/null
@@ -0,0 +1,274 @@
+/*-------------------------------------------------------------------------
+ *
+ * localbuf.c
+ *       local buffer manager. Fast buffer manager for temporary tables
+ *       or special cases when the operation is not visible to other backends.
+ *
+ *       When a relation is being created, the descriptor will have rd_islocal
+ *       set to indicate that the local buffer manager should be used. During
+ *       the same transaction the relation is being created, any inserts or
+ *       selects from the newly created relation will use the local buffer
+ *       pool. rd_islocal is reset at the end of a transaction (commit/abort).
+ *       This is useful for queries like SELECT INTO TABLE and create index.
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994-5, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/Attic/xlog_localbuf.c,v 1.1 2000/10/28 16:20:56 vadim Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include <sys/types.h>
+#include <sys/file.h>
+#include <math.h>
+#include <signal.h>
+
+#include "postgres.h"
+
+#include "executor/execdebug.h"
+#include "storage/smgr.h"
+#include "utils/relcache.h"
+
+extern long int LocalBufferFlushCount;
+
+int                    NLocBuffer = 64;
+BufferDesc *LocalBufferDescriptors = NULL;
+long      *LocalRefCount = NULL;
+
+static int     nextFreeLocalBuf = 0;
+
+/*#define LBDEBUG*/
+
+/*
+ * LocalBufferAlloc -
+ *       allocate a local buffer. We do round robin allocation for now.
+ */
+BufferDesc *
+LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
+{
+       int                     i;
+       BufferDesc *bufHdr = (BufferDesc *) NULL;
+
+       if (blockNum == P_NEW)
+       {
+               blockNum = reln->rd_nblocks;
+               reln->rd_nblocks++;
+       }
+
+       /* a low tech search for now -- not optimized for scans */
+       for (i = 0; i < NLocBuffer; i++)
+       {
+               if (LocalBufferDescriptors[i].tag.rnode.relNode == 
+                       reln->rd_node.relNode &&
+                       LocalBufferDescriptors[i].tag.blockNum == blockNum)
+               {
+
+#ifdef LBDEBUG
+                       fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+                                       RelationGetRelid(reln), blockNum, -i - 1);
+#endif
+                       LocalRefCount[i]++;
+                       *foundPtr = TRUE;
+                       return &LocalBufferDescriptors[i];
+               }
+       }
+
+#ifdef LBDEBUG
+       fprintf(stderr, "LB ALLOC (%u,%d) %d\n",
+                       RelationGetRelid(reln), blockNum, -nextFreeLocalBuf - 1);
+#endif
+
+       /* need to get a new buffer (round robin for now) */
+       for (i = 0; i < NLocBuffer; i++)
+       {
+               int                     b = (nextFreeLocalBuf + i) % NLocBuffer;
+
+               if (LocalRefCount[b] == 0)
+               {
+                       bufHdr = &LocalBufferDescriptors[b];
+                       LocalRefCount[b]++;
+                       nextFreeLocalBuf = (b + 1) % NLocBuffer;
+                       break;
+               }
+       }
+       if (bufHdr == NULL)
+               elog(ERROR, "no empty local buffer.");
+
+       /*
+        * this buffer is not referenced but it might still be dirty (the last
+        * transaction to touch it doesn't need its contents but has not
+        * flushed it).  if that's the case, write it out before reusing it!
+        */
+       if (bufHdr->flags & BM_DIRTY || bufHdr->cntxDirty)
+       {
+               Relation        bufrel = RelationNodeCacheGetRelation(bufHdr->tag.rnode);
+
+               Assert(bufrel != NULL);
+
+               /* flush this page */
+               smgrwrite(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
+                                 (char *) MAKE_PTR(bufHdr->data));
+               LocalBufferFlushCount++;
+
+               /*
+                * drop relcache refcount incremented by
+                * RelationIdCacheGetRelation
+                */
+               RelationDecrementReferenceCount(bufrel);
+       }
+
+       /*
+        * it's all ours now.
+        *
+        * We need not in tblNode currently but will in future I think,
+        * when we'll give up rel->rd_fd to fmgr cache.
+        */
+       bufHdr->tag.rnode = reln->rd_node;
+       bufHdr->tag.blockNum = blockNum;
+       bufHdr->flags &= ~BM_DIRTY;
+       bufHdr->cntxDirty = false;
+
+       /*
+        * lazy memory allocation. (see MAKE_PTR for why we need to do
+        * MAKE_OFFSET.)
+        */
+       if (bufHdr->data == (SHMEM_OFFSET) 0)
+       {
+               char       *data = (char *) malloc(BLCKSZ);
+
+               bufHdr->data = MAKE_OFFSET(data);
+       }
+
+       *foundPtr = FALSE;
+       return bufHdr;
+}
+
+/*
+ * WriteLocalBuffer -
+ *       writes out a local buffer
+ */
+int
+WriteLocalBuffer(Buffer buffer, bool release)
+{
+       int                     bufid;
+
+       Assert(BufferIsLocal(buffer));
+
+#ifdef LBDEBUG
+       fprintf(stderr, "LB WRITE %d\n", buffer);
+#endif
+
+       bufid = -(buffer + 1);
+       LocalBufferDescriptors[bufid].flags |= BM_DIRTY;
+
+       if (release)
+       {
+               Assert(LocalRefCount[bufid] > 0);
+               LocalRefCount[bufid]--;
+       }
+
+       return true;
+}
+
+/*
+ * InitLocalBuffer -
+ *       init the local buffer cache. Since most queries (esp. multi-user ones)
+ *       don't involve local buffers, we delay allocating memory for actual the
+ *       buffer until we need it.
+ */
+void
+InitLocalBuffer(void)
+{
+       int                     i;
+
+       /*
+        * these aren't going away. I'm not gonna use palloc.
+        */
+       LocalBufferDescriptors =
+               (BufferDesc *) malloc(sizeof(BufferDesc) * NLocBuffer);
+       MemSet(LocalBufferDescriptors, 0, sizeof(BufferDesc) * NLocBuffer);
+       nextFreeLocalBuf = 0;
+
+       for (i = 0; i < NLocBuffer; i++)
+       {
+               BufferDesc *buf = &LocalBufferDescriptors[i];
+
+               /*
+                * negative to indicate local buffer. This is tricky: shared
+                * buffers start with 0. We have to start with -2. (Note that the
+                * routine BufferDescriptorGetBuffer adds 1 to buf_id so our first
+                * buffer id is -1.)
+                */
+               buf->buf_id = -i - 2;
+       }
+
+       LocalRefCount = (long *) malloc(sizeof(long) * NLocBuffer);
+       MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+}
+
+/*
+ * LocalBufferSync
+ *
+ * Flush all dirty buffers in the local buffer cache at commit time.
+ * Since the buffer cache is only used for keeping relations visible
+ * during a transaction, we will not need these buffers again.
+ *
+ * Note that we have to *flush* local buffers because of them are not
+ * visible to checkpoint makers. But we can skip XLOG flush check.
+ */
+void
+LocalBufferSync(void)
+{
+       int                     i;
+
+       for (i = 0; i < NLocBuffer; i++)
+       {
+               BufferDesc *buf = &LocalBufferDescriptors[i];
+               Relation        bufrel;
+
+               if (buf->flags & BM_DIRTY || buf->cntxDirty)
+               {
+#ifdef LBDEBUG
+                       fprintf(stderr, "LB SYNC %d\n", -i - 1);
+#endif
+                       bufrel = RelationNodeCacheGetRelation(buf->tag.rnode);
+
+                       Assert(bufrel != NULL);
+
+                       smgrwrite(DEFAULT_SMGR, bufrel, buf->tag.blockNum,
+                                               (char *) MAKE_PTR(buf->data));
+                       smgrmarkdirty(DEFAULT_SMGR, bufrel, buf->tag.blockNum);
+                       LocalBufferFlushCount++;
+
+                       /* drop relcache refcount from RelationIdCacheGetRelation */
+                       RelationDecrementReferenceCount(bufrel);
+
+                       buf->flags &= ~BM_DIRTY;
+                       buf->cntxDirty = false;
+               }
+       }
+
+       MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+       nextFreeLocalBuf = 0;
+}
+
+void
+ResetLocalBufferPool(void)
+{
+       int                     i;
+
+       for (i = 0; i < NLocBuffer; i++)
+       {
+               BufferDesc *buf = &LocalBufferDescriptors[i];
+
+               buf->tag.rnode.relNode = InvalidOid;
+               buf->flags &= ~BM_DIRTY;
+               buf->cntxDirty = false;
+               buf->buf_id = -i - 2;
+       }
+
+       MemSet(LocalRefCount, 0, sizeof(long) * NLocBuffer);
+       nextFreeLocalBuf = 0;
+}
index 128c49989a060f703cbac97f3554e8921b01ab34..84c4e76c09d626cb2b95cf3e8021bc9b15007c5e 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.64 2000/10/02 19:42:47 petere Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/file/fd.c,v 1.65 2000/10/28 16:20:56 vadim Exp $
  *
  * NOTES:
  *
@@ -823,8 +823,10 @@ FileWrite(File file, char *buffer, int amount)
        if (returnCode > 0)
        {
                VfdCache[file].seekPos += returnCode;
+#ifndef XLOG
                /* mark the file as needing fsync */
                VfdCache[file].fdstate |= FD_DIRTY;
+#endif
        }
        else
                VfdCache[file].seekPos = FileUnknownPos;
index ff8b4ce52fe74c76bfbed51ef7fd6939a34dd532..da466afe9f8aefa5ed3c70cd616f887672cebee4 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.76 2000/10/20 11:01:11 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/smgr/md.c,v 1.77 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -76,12 +76,7 @@ static int   _mdfd_getrelnfd(Relation reln);
 static MdfdVec *_mdfd_openseg(Relation reln, int segno, int oflags);
 static MdfdVec *_mdfd_getseg(Relation reln, int blkno);
 
-#ifdef OLD_FILE_NAMING
-static int _mdfd_blind_getseg(char *dbname, char *relname,
-                                  Oid dbid, Oid relid, int blkno);
-#else
 static int _mdfd_blind_getseg(RelFileNode rnode, int blkno);
-#endif
 
 static int     _fdvec_alloc(void);
 static void _fdvec_free(int);
@@ -134,11 +129,7 @@ mdcreate(Relation reln)
 
        Assert(reln->rd_unlinked && reln->rd_fd < 0);
 
-#ifdef OLD_FILE_NAMING
-       path = relpath(RelationGetPhysicalRelationName(reln));
-#else
        path = relpath(reln->rd_node);
-#endif
        fd = FileNameOpenFile(path, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, 0600);
 
        /*
@@ -336,11 +327,7 @@ mdopen(Relation reln)
        int                     vfd;
 
        Assert(reln->rd_fd < 0);
-#ifdef OLD_FILE_NAMING
-       path = relpath(RelationGetPhysicalRelationName(reln));
-#else
        path = relpath(reln->rd_node);
-#endif
 
        fd = FileNameOpenFile(path, O_RDWR | PG_BINARY, 0600);
        if (fd < 0)
@@ -579,30 +566,16 @@ mdflush(Relation reln, BlockNumber blocknum, char *buffer)
  *             the file, making it more like mdflush().
  */
 int
-#ifdef OLD_FILE_NAMING
-mdblindwrt(char *dbname,
-                  char *relname,
-                  Oid dbid,
-                  Oid relid,
-                  BlockNumber blkno,
-                  char *buffer,
-                  bool dofsync)
-#else
 mdblindwrt(RelFileNode rnode,
                   BlockNumber blkno,
                   char *buffer,
                   bool dofsync)
-#endif
 {
        int                     status;
        long            seekpos;
        int                     fd;
 
-#ifdef OLD_FILE_NAMING
-       fd = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
-#else
        fd = _mdfd_blind_getseg(rnode, blkno);
-#endif
 
        if (fd < 0)
                return SM_FAIL;
@@ -676,25 +649,13 @@ mdmarkdirty(Relation reln, BlockNumber blkno)
  *             rather than building md/fd datastructures to postpone it till later.
  */
 int
-#ifdef OLD_FILE_NAMING
-mdblindmarkdirty(char *dbname,
-                                char *relname,
-                                Oid dbid,
-                                Oid relid,
-                                BlockNumber blkno)
-#else
 mdblindmarkdirty(RelFileNode rnode,
                                 BlockNumber blkno)
-#endif
 {
        int                     status;
        int                     fd;
 
-#ifdef OLD_FILE_NAMING
-       fd = _mdfd_blind_getseg(dbname, relname, dbid, relid, blkno);
-#else
        fd = _mdfd_blind_getseg(rnode, blkno);
-#endif
 
        if (fd < 0)
                return SM_FAIL;
@@ -915,6 +876,22 @@ mdabort()
        return SM_SUCCESS;
 }
 
+#ifdef XLOG
+/*
+ *     mdsync() -- Sync storage.
+ *
+ */
+int
+mdsync()
+{
+       sync();
+       if (IsUnderPostmaster)
+               sleep(2);
+       sync();
+       return SM_SUCCESS;
+}
+#endif
+
 /*
  *     _fdvec_alloc () -- grab a free (or new) md file descriptor vector.
  *
@@ -996,11 +973,7 @@ _mdfd_openseg(Relation reln, int segno, int oflags)
                           *fullpath;
 
        /* be sure we have enough space for the '.segno', if any */
-#ifdef OLD_FILE_NAMING
-       path = relpath(RelationGetPhysicalRelationName(reln));
-#else
        path = relpath(reln->rd_node);
-#endif
 
        if (segno > 0)
        {
@@ -1115,12 +1088,7 @@ _mdfd_getseg(Relation reln, int blkno)
  */
 
 static int
-#ifdef OLD_FILE_NAMING
-_mdfd_blind_getseg(char *dbname, char *relname, Oid dbid, Oid relid,
-                                  int blkno)
-#else
 _mdfd_blind_getseg(RelFileNode rnode, int blkno)
-#endif
 {
        char       *path;
        int                     fd;
@@ -1130,12 +1098,7 @@ _mdfd_blind_getseg(RelFileNode rnode, int blkno)
 
 #endif
 
-#ifdef OLD_FILE_NAMING
-       /* construct the path to the relation */
-       path = relpath_blind(dbname, relname, dbid, relid);
-#else
        path = relpath(rnode);
-#endif
 
 #ifndef LET_OS_MANAGE_FILESIZE
        /* append the '.segno', if needed */
index 65bc5595a8595a75d351e7a990c22224a6240eb4..d2a940a76e530bd410659e9da88ff702dee871a6 100644 (file)
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.41 2000/10/21 15:43:31 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/smgr/smgr.c,v 1.42 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,27 +36,17 @@ typedef struct f_smgr
                                                                                   char *buffer);
        int                     (*smgr_flush) (Relation reln, BlockNumber blocknum,
                                                                                   char *buffer);
-#ifdef OLD_FILE_NAMING
-       int                     (*smgr_blindwrt) (char *dbname, char *relname,
-                                                                                         Oid dbid, Oid relid,
-                                                                                BlockNumber blkno, char *buffer,
-                                                                                         bool dofsync);
-#else
        int                     (*smgr_blindwrt) (RelFileNode rnode, BlockNumber blkno, 
                                                                                char *buffer, bool dofsync);
-#endif
        int                     (*smgr_markdirty) (Relation reln, BlockNumber blkno);
-#ifdef OLD_FILE_NAMING
-       int                     (*smgr_blindmarkdirty) (char *dbname, char *relname,
-                                                                                                       Oid dbid, Oid relid,
-                                                                                                       BlockNumber blkno);
-#else
        int                     (*smgr_blindmarkdirty) (RelFileNode, BlockNumber blkno);
-#endif
        int                     (*smgr_nblocks) (Relation reln);
        int                     (*smgr_truncate) (Relation reln, int nblocks);
        int                     (*smgr_commit) (void);  /* may be NULL */
        int                     (*smgr_abort) (void);   /* may be NULL */
+#ifdef XLOG
+       int                     (*smgr_sync) (void);
+#endif
 } f_smgr;
 
 /*
@@ -69,7 +59,11 @@ static f_smgr smgrsw[] = {
        /* magnetic disk */
        {mdinit, NULL, mdcreate, mdunlink, mdextend, mdopen, mdclose,
                mdread, mdwrite, mdflush, mdblindwrt, mdmarkdirty, mdblindmarkdirty,
+#ifdef XLOG
+       mdnblocks, mdtruncate, mdcommit, mdabort, mdsync},
+#else
        mdnblocks, mdtruncate, mdcommit, mdabort},
+#endif
 
 #ifdef STABLE_MEMORY_STORAGE
        /* main memory */
@@ -310,40 +304,6 @@ smgrflush(int16 which, Relation reln, BlockNumber blocknum, char *buffer)
  *             this page down to stable storage in this circumstance.  The
  *             write should be synchronous if dofsync is true.
  */
-#ifdef OLD_FILE_NAMING
-int
-smgrblindwrt(int16 which,
-                        char *dbname,
-                        char *relname,
-                        Oid dbid,
-                        Oid relid,
-                        BlockNumber blkno,
-                        char *buffer,
-                        bool dofsync)
-{
-       char       *dbstr;
-       char       *relstr;
-       int                     status;
-
-       /* strdup here is probably redundant */
-       dbstr = pstrdup(dbname);
-       relstr = pstrdup(relname);
-
-       status = (*(smgrsw[which].smgr_blindwrt)) (dbstr, relstr, dbid, relid,
-                                                                                          blkno, buffer, dofsync);
-
-       if (status == SM_FAIL)
-               elog(ERROR, "cannot write block %d of %s [%s] blind: %m",
-                        blkno, relstr, dbstr);
-
-       pfree(dbstr);
-       pfree(relstr);
-
-       return status;
-}
-
-#else
-
 int
 smgrblindwrt(int16 which,
                         RelFileNode rnode,
@@ -361,7 +321,6 @@ smgrblindwrt(int16 which,
 
        return status;
 }
-#endif
 
 /*
  *     smgrmarkdirty() -- Mark a page dirty (needs fsync).
@@ -394,39 +353,6 @@ smgrmarkdirty(int16 which,
  *
  *             Just like smgrmarkdirty, except we don't have a reldesc.
  */
-#ifdef OLD_FILE_NAMING
-int
-smgrblindmarkdirty(int16 which,
-                                  char *dbname,
-                                  char *relname,
-                                  Oid dbid,
-                                  Oid relid,
-                                  BlockNumber blkno)
-{
-       char       *dbstr;
-       char       *relstr;
-       int                     status;
-
-       /* strdup here is probably redundant */
-       dbstr = pstrdup(dbname);
-       relstr = pstrdup(relname);
-
-       status = (*(smgrsw[which].smgr_blindmarkdirty)) (dbstr, relstr,
-                                                                                                        dbid, relid,
-                                                                                                        blkno);
-
-       if (status == SM_FAIL)
-               elog(ERROR, "cannot mark block %d of %s [%s] blind: %m",
-                        blkno, relstr, dbstr);
-
-       pfree(dbstr);
-       pfree(relstr);
-
-       return status;
-}
-
-#else
-
 int
 smgrblindmarkdirty(int16 which,
                                   RelFileNode rnode,
@@ -442,7 +368,6 @@ smgrblindmarkdirty(int16 which,
 
        return status;
 }
-#endif
 
 /*
  *     smgrnblocks() -- Calculate the number of POSTGRES blocks in the
@@ -528,6 +453,27 @@ smgrabort()
        return SM_SUCCESS;
 }
 
+#ifdef XLOG
+int
+smgrsync()
+{
+       int                     i;
+
+       for (i = 0; i < NSmgr; i++)
+       {
+               if (smgrsw[i].smgr_sync)
+               {
+                       if ((*(smgrsw[i].smgr_sync)) () == SM_FAIL)
+                               elog(STOP, "storage sync failed on %s: %m",
+                                        DatumGetCString(DirectFunctionCall1(smgrout,
+                                                                                                                Int16GetDatum(i))));
+               }
+       }
+
+       return SM_SUCCESS;
+}
+#endif
+
 #ifdef NOT_USED
 bool
 smgriswo(int16 smgrno)
index de3e3c4a8d30d2ae20443168169e3ce7a3255dfe..ea7a8d0212c3f4cb6bedf1ee97c29b3fcd465827 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.113 2000/10/23 04:10:08 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v 1.114 2000/10/28 16:20:57 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -2064,7 +2064,62 @@ RelationCacheInitializePhase2(void)
        }
 }
 
+#ifdef XLOG            /* used by XLogInitCache */
 
+void CreateDummyCaches(void);
+void DestroyDummyCaches(void);
+
+void
+CreateDummyCaches(void)
+{
+       MemoryContext   oldcxt;
+       HASHCTL                 ctl;
+
+       if (!CacheMemoryContext)
+               CreateCacheMemoryContext();
+
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       MemSet(&ctl, 0, (int) sizeof(ctl));
+       ctl.keysize = sizeof(NameData);
+       ctl.datasize = sizeof(Relation);
+       RelationNameCache = hash_create(INITRELCACHESIZE, &ctl, HASH_ELEM);
+
+       ctl.keysize = sizeof(Oid);
+       ctl.hash = tag_hash;
+       RelationIdCache = hash_create(INITRELCACHESIZE, &ctl,
+                                                                 HASH_ELEM | HASH_FUNCTION);
+
+       ctl.keysize = sizeof(RelFileNode);
+       ctl.hash = tag_hash;
+       RelationNodeCache = hash_create(INITRELCACHESIZE, &ctl,
+                                                                 HASH_ELEM | HASH_FUNCTION);
+       MemoryContextSwitchTo(oldcxt);
+}
+
+void
+DestroyDummyCaches(void)
+{
+       MemoryContext   oldcxt;
+
+       if (!CacheMemoryContext)
+               return;
+
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       if (RelationNameCache)
+               hash_destroy(RelationNameCache);
+       if (RelationIdCache)
+               hash_destroy(RelationIdCache);
+       if (RelationNodeCache)
+               hash_destroy(RelationNodeCache);
+
+       RelationNameCache = RelationIdCache = RelationNodeCache = NULL;
+
+       MemoryContextSwitchTo(oldcxt);
+}
+
+#endif /* XLOG */
 
 static void
 AttrDefaultFetch(Relation relation)
index cee8dfaac90acf553bc1250ce755c53adcc5641d..fbc9cc2ab2cc21cb7ac8c63cd9d52ec9f1ca7d69 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/utils/init/postinit.c,v 1.68 2000/10/16 14:52:15 vadim Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/utils/init/postinit.c,v 1.69 2000/10/28 16:20:58 vadim Exp $
  *
  *
  *-------------------------------------------------------------------------
@@ -231,9 +231,6 @@ InitPostgres(const char *dbname, const char *username)
 {
        bool            bootstrap = IsBootstrapProcessingMode();
 
-       /* initialize the local buffer manager */
-       InitLocalBuffer();
-
 #ifndef XLOG
        if (!TransactionFlushEnabled())
                on_shmem_exit(FlushBufferPool, 0);
@@ -414,4 +411,8 @@ BaseInit(void)
        smgrinit();
 
        EnablePortalManager();          /* memory for portal/transaction stuff */
+
+       /* initialize the local buffer manager */
+       InitLocalBuffer();
+
 }
index 752682ca969d97b5df0c582d026dfc9f679144b6..415ad56b959c4449aea705ff86b91539ecf861dd 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: transam.h,v 1.24 2000/01/26 05:57:51 momjian Exp $
+ * $Id: transam.h,v 1.25 2000/10/28 16:20:59 vadim Exp $
  *
  *      NOTES
  *             Transaction System Version 101 now support proper oid
@@ -67,7 +67,11 @@ typedef unsigned char XidStatus;/* (2 bits) */
  *             transaction page definitions
  * ----------------
  */
+#ifdef XLOG
+#define TP_DataSize                            (BLCKSZ - sizeof(XLogRecPtr))
+#else
 #define TP_DataSize                            BLCKSZ
+#endif
 #define TP_NumXidStatusPerBlock (TP_DataSize * 4)
 
 /* ----------------
@@ -84,6 +88,10 @@ typedef unsigned char XidStatus;/* (2 bits) */
  */
 typedef struct LogRelationContentsData
 {
+#ifdef XLOG
+       XLogRecPtr      LSN;            /* temp hack: LSN is member of any block */
+                                                       /* so should be described in bufmgr */
+#endif
        int                     TransSystemVersion;
 } LogRelationContentsData;
 
@@ -107,6 +115,9 @@ typedef LogRelationContentsData *LogRelationContents;
  */
 typedef struct VariableRelationContentsData
 {
+#ifdef XLOG
+       XLogRecPtr      LSN;
+#endif
        int                     TransSystemVersion;
        TransactionId nextXidData;
        TransactionId lastXidData;      /* unused */
index 712e88b6005828a1b2529e899db7811270650c31..18ca96f3d837758a0317b962c385a038b4fbd381 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: xact.h,v 1.28 2000/10/20 11:01:14 vadim Exp $
+ * $Id: xact.h,v 1.29 2000/10/28 16:20:59 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -135,6 +135,8 @@ extern bool IsTransactionBlock(void);
 extern void UserAbortTransactionBlock(void);
 extern void AbortOutOfAnyTransaction(void);
 
+extern void RecordTransactionCommit(void);
+
 extern TransactionId DisabledTransactionId;
 
 extern void XactPushRollback(void (*func) (void *), void* data);
index c77c1cac02a6431293c34d4beec857c7d982b20b..02998755c328e889f53a0d8d9057c9cf9637861e 100644 (file)
 
 #include "access/rmgr.h"
 #include "access/transam.h"
-
-typedef struct XLogRecPtr
-{
-       uint32          xlogid;                 /* log file #, 0 based */
-       uint32          xrecoff;                /* offset of record in log file */
-} XLogRecPtr;
+#include "access/xlogdefs.h"
 
 typedef struct XLogRecord
 {
@@ -83,12 +78,7 @@ typedef XLogPageHeaderData *XLogPageHeader;
 #define XLByteEQ(left, right)          \
                        (right.xlogid == left.xlogid && right.xrecoff ==  left.xrecoff)
 
-/*
- * StartUpID (SUI) - system startups counter.
- * It's to allow removing pg_log after shutdown.
- */
-typedef        uint32          StartUpID;
-extern StartUpID       ThisStartUpID;
+extern StartUpID       ThisStartUpID;  /* current SUI */
 extern bool            InRecovery;
 extern XLogRecPtr      MyLastRecPtr;
 
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
new file mode 100644 (file)
index 0000000..ce1b3ef
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ *
+ * xlogdefs.h
+ *
+ * Postgres transaction log manager record pointer and
+ * system stratup number definitions
+ *
+ */
+#ifndef XLOG_DEFS_H
+#define XLOG_DEFS_H
+
+typedef struct XLogRecPtr
+{
+       uint32          xlogid;                 /* log file #, 0 based */
+       uint32          xrecoff;                /* offset of record in log file */
+} XLogRecPtr;
+
+/*
+ * StartUpID (SUI) - system startups counter. It's to allow removing
+ * pg_log after shutdown, in future.
+ */
+typedef        uint32          StartUpID;
+
+#endif  /* XLOG_DEFS_H */
index f62f726d8313f27ad1649321f55789c6f3b98d3e..b8fa3549f42991a1f26b305a90436dd2cd14dc8f 100644 (file)
@@ -9,8 +9,10 @@ extern bool XLogIsValidTuple(RelFileNode hnode, ItemPointer iptr);
 
 extern void XLogOpenLogRelation(void);
 
-extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno);
+extern void XLogInitRelationCache(void);
 extern void XLogCloseRelationCache(void);
+
 extern Relation XLogOpenRelation(bool redo, RmgrId rmid, RelFileNode rnode);
+extern Buffer XLogReadBuffer(bool extend, Relation reln, BlockNumber blkno);
 
 #endif
index 65abe9b8ceb5a1502f1d64225d65ddcb54d48a07..80aca7c57e96f9035797916007e5742fea6607cc 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: buf_internals.h,v 1.41 2000/10/23 04:10:14 vadim Exp $
+ * $Id: buf_internals.h,v 1.42 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -109,6 +109,10 @@ typedef struct sbufdesc
        bool            ri_lock;                /* read-intent lock */
        bool            w_lock;                 /* context exclusively locked */
 
+#ifdef XLOG
+       bool            cntxDirty;              /* new way to mark block as dirty */
+#endif
+
        BufferBlindId blind;            /* was used to support blind write */
 
        /*
index 551f98e75f94437a20b8a1a9764e847d7b6b1eeb..0ed4837305d5c031aeaa30601ab275774b744bc8 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufmgr.h,v 1.41 2000/10/20 11:01:21 vadim Exp $
+ * $Id: bufmgr.h,v 1.42 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -15,7 +15,7 @@
 #define BUFMGR_H
 
 #include "storage/buf_internals.h"
-
+#include "access/xlogdefs.h"
 
 typedef void *Block;
 
@@ -177,4 +177,9 @@ extern void AbortBufferIO(void);
 extern bool BufferIsUpdatable(Buffer buffer);
 extern void MarkBufferForCleanup(Buffer buffer, void (*CleanupFunc)(Buffer));
 
+#ifdef XLOG
+extern void BufmgrCommit(void);
+extern void BufferSync(void);
+#endif
+
 #endif
index d547f71b736a9a4959a95a868d5801a0506cdacb..78b22f392cff48f5588939f83d8bffe0341ffc67 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufpage.h,v 1.34 2000/10/21 15:43:36 vadim Exp $
+ * $Id: bufpage.h,v 1.35 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -118,7 +118,8 @@ typedef OpaqueData *Opaque;
  */
 typedef struct PageHeaderData
 {
-#ifdef XLOG
+#ifdef XLOG                                            /* XXX LSN is member of *any* block, not */
+                                                               /* only page-organized - 'll change later */
        XLogRecPtr      pd_lsn;                 /* LSN: next byte after last byte of xlog */
                                                                /* record for last change of this page */
        StartUpID       pd_sui;                 /* SUI of last changes (currently it's */
index 7caac813e9acb7bafb999dc0e3075bfe59b86dba..49a2e3e5e922da16fb829891d1312a37c14aa577 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: smgr.h,v 1.22 2000/10/16 14:52:28 vadim Exp $
+ * $Id: smgr.h,v 1.23 2000/10/28 16:21:00 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,26 +36,19 @@ extern int smgrwrite(int16 which, Relation reln, BlockNumber blocknum,
                  char *buffer);
 extern int smgrflush(int16 which, Relation reln, BlockNumber blocknum,
                  char *buffer);
-#ifdef OLD_FILE_NAMING
-extern int smgrblindwrt(int16 which, char *dbname, char *relname,
-                        Oid dbid, Oid relid,
-                        BlockNumber blkno, char *buffer,
-                        bool dofsync);
-extern int smgrblindmarkdirty(int16 which, char *dbname, char *relname,
-                                  Oid dbid, Oid relid,
-                                  BlockNumber blkno);
-#else
 extern int smgrblindwrt(int16 which, RelFileNode rnode,
                                                BlockNumber blkno, char *buffer, bool dofsync);
 extern int smgrblindmarkdirty(int16 which, RelFileNode rnode,
                                                BlockNumber blkno);
-#endif
 extern int     smgrmarkdirty(int16 which, Relation reln, BlockNumber blkno);
 extern int     smgrnblocks(int16 which, Relation reln);
 extern int     smgrtruncate(int16 which, Relation reln, int nblocks);
 extern int     smgrcommit(void);
 extern int     smgrabort(void);
 
+#ifdef XLOG
+extern int     smgrsync(void);
+#endif
 
 
 /* internals: move me elsewhere -- ay 7/94 */
@@ -71,22 +64,18 @@ extern int  mdread(Relation reln, BlockNumber blocknum, char *buffer);
 extern int     mdwrite(Relation reln, BlockNumber blocknum, char *buffer);
 extern int     mdflush(Relation reln, BlockNumber blocknum, char *buffer);
 extern int     mdmarkdirty(Relation reln, BlockNumber blkno);
-#ifdef OLD_FILE_NAMING
-extern int mdblindwrt(char *dbname, char *relname, Oid dbid, Oid relid,
-                  BlockNumber blkno, char *buffer,
-                  bool dofsync);
-extern int mdblindmarkdirty(char *dbname, char *relname, Oid dbid, Oid relid,
-                                BlockNumber blkno);
-#else
 extern int mdblindwrt(RelFileNode rnode, BlockNumber blkno,
                                                char *buffer, bool dofsync);
 extern int mdblindmarkdirty(RelFileNode rnode, BlockNumber blkno);
-#endif
 extern int     mdnblocks(Relation reln);
 extern int     mdtruncate(Relation reln, int nblocks);
 extern int     mdcommit(void);
 extern int     mdabort(void);
 
+#ifdef XLOG
+extern int     mdsync(void);
+#endif
+
 /* mm.c */
 extern SPINLOCK MMCacheLock;