]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Get rid of WALBufMappingLock
authorAlexander Korotkov <akorotkov@postgresql.org>
Mon, 17 Feb 2025 02:19:01 +0000 (04:19 +0200)
committerAlexander Korotkov <akorotkov@postgresql.org>
Mon, 17 Feb 2025 02:25:29 +0000 (04:25 +0200)
Allow multiple backends to initialize WAL buffers concurrently.  This way
`MemSet((char *) NewPage, 0, XLOG_BLCKSZ);` can run in parallel without
taking a single LWLock in exclusive mode.

The new algorithm works as follows:
 * reserve a page for initialization using XLogCtl->InitializeReserved,
 * ensure the page is written out,
 * once the page is initialized, try to advance XLogCtl->InitializedUpTo and
   signal to waiters using XLogCtl->InitializedUpToCondVar condition
   variable,
 * repeat previous steps until we reserve initialization up to the target
   WAL position,
 * wait until concurrent initialization finishes using a
   XLogCtl->InitializedUpToCondVar.

Now, multiple backends can, in parallel, concurrently reserve pages,
initialize them, and advance XLogCtl->InitializedUpTo to point to the latest
initialized page.

Author: Yura Sokolov <y.sokolov@postgrespro.ru>
Co-authored-by: Alexander Korotkov <aekorotkov@gmail.com>
Reviewed-by: Pavel Borisov <pashkin.elfe@gmail.com>
src/backend/access/transam/xlog.c
src/backend/utils/activity/wait_event_names.txt
src/include/storage/lwlocklist.h

index a50fd99d9e582edb98372c7c3a6e040cf0a934d8..87f9fb7a654fc99d8eaf31ccce9979fb283b38ce 100644 (file)
@@ -302,11 +302,6 @@ static bool doPageWrites;
  * so it's a plain spinlock.  The other locks are held longer (potentially
  * over I/O operations), so we use LWLocks for them.  These locks are:
  *
- * WALBufMappingLock: must be held to replace a page in the WAL buffer cache.
- * It is only held while initializing and changing the mapping.  If the
- * contents of the buffer being replaced haven't been written yet, the mapping
- * lock is released while the write is done, and reacquired afterwards.
- *
  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
  * XLogFlush).
  *
@@ -473,21 +468,32 @@ typedef struct XLogCtlData
        pg_atomic_uint64 logFlushResult;        /* last byte + 1 flushed */
 
        /*
-        * Latest initialized page in the cache (last byte position + 1).
+        * Latest reserved for inititalization page in the cache (last byte
+        * position + 1).
         *
-        * To change the identity of a buffer (and InitializedUpTo), you need to
-        * hold WALBufMappingLock.  To change the identity of a buffer that's
+        * To change the identity of a buffer, you need to advance
+        * InitializeReserved first.  To change the identity of a buffer that's
         * still dirty, the old page needs to be written out first, and for that
         * you need WALWriteLock, and you need to ensure that there are no
         * in-progress insertions to the page by calling
         * WaitXLogInsertionsToFinish().
         */
-       XLogRecPtr      InitializedUpTo;
+       pg_atomic_uint64 InitializeReserved;
+
+       /*
+        * Latest initialized page in the cache (last byte position + 1).
+        *
+        * InitializedUpTo is updated after the buffer initialization.  After
+        * update, waiters got notification using InitializedUpToCondVar.
+        */
+       pg_atomic_uint64 InitializedUpTo;
+       ConditionVariable InitializedUpToCondVar;
 
        /*
         * These values do not change after startup, although the pointed-to pages
-        * and xlblocks values certainly do.  xlblocks values are protected by
-        * WALBufMappingLock.
+        * and xlblocks values certainly do.  xlblocks values are changed
+        * lock-free according to the check for the xlog write position and are
+        * accompanied by changes of InitializeReserved and InitializedUpTo.
         */
        char       *pages;                      /* buffers for unwritten XLOG pages */
        pg_atomic_uint64 *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
@@ -810,9 +816,9 @@ XLogInsertRecord(XLogRecData *rdata,
         * fullPageWrites from changing until the insertion is finished.
         *
         * Step 2 can usually be done completely in parallel. If the required WAL
-        * page is not initialized yet, you have to grab WALBufMappingLock to
-        * initialize it, but the WAL writer tries to do that ahead of insertions
-        * to avoid that from happening in the critical path.
+        * page is not initialized yet, you have to go through AdvanceXLInsertBuffer,
+        * which will ensure it is initialized. But the WAL writer tries to do that
+        * ahead of insertions to avoid that from happening in the critical path.
         *
         *----------
         */
@@ -1991,32 +1997,70 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
        XLogRecPtr      NewPageEndPtr = InvalidXLogRecPtr;
        XLogRecPtr      NewPageBeginPtr;
        XLogPageHeader NewPage;
+       XLogRecPtr      ReservedPtr;
        int                     npages pg_attribute_unused() = 0;
 
-       LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
-
        /*
-        * Now that we have the lock, check if someone initialized the page
-        * already.
+        * We must run the loop below inside the critical section as we expect
+        * XLogCtl->InitializedUpTo to eventually keep up.  The most of callers
+        * already run inside the critical section. Except for WAL writer, which
+        * passed 'opportunistic == true', and therefore we don't perform
+        * operations that could error out.
+        *
+        * Start an explicit critical section anyway though.
         */
-       while (upto >= XLogCtl->InitializedUpTo || opportunistic)
+       Assert(CritSectionCount > 0 || opportunistic);
+       START_CRIT_SECTION();
+
+       /*--
+        * Loop till we get all the pages in WAL buffer before 'upto' reserved for
+        * initialization.  Multiple process can initialize different buffers with
+        * this loop in parallel as following.
+        *
+        * 1. Reserve page for initialization using XLogCtl->InitializeReserved.
+        * 2. Initialize the reserved page.
+        * 3. Attempt to advance XLogCtl->InitializedUpTo,
+        */
+       ReservedPtr = pg_atomic_read_u64(&XLogCtl->InitializeReserved);
+       while (upto >= ReservedPtr || opportunistic)
        {
-               nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
+               Assert(ReservedPtr % XLOG_BLCKSZ == 0);
 
                /*
-                * Get ending-offset of the buffer page we need to replace (this may
-                * be zero if the buffer hasn't been used yet).  Fall through if it's
-                * already written out.
+                * Get ending-offset of the buffer page we need to replace.
+                *
+                * We don't lookup into xlblocks, but rather calculate position we
+                * must wait to be written. If it was written, xlblocks will have this
+                * position (or uninitialized)
                 */
-               OldPageRqstPtr = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
-               if (LogwrtResult.Write < OldPageRqstPtr)
+               if (ReservedPtr + XLOG_BLCKSZ > XLOG_BLCKSZ * XLOGbuffers)
+                       OldPageRqstPtr = ReservedPtr + XLOG_BLCKSZ - XLOG_BLCKSZ * XLOGbuffers;
+               else
+                       OldPageRqstPtr = InvalidXLogRecPtr;
+
+               if (LogwrtResult.Write < OldPageRqstPtr && opportunistic)
                {
                        /*
-                        * Nope, got work to do. If we just want to pre-initialize as much
-                        * as we can without flushing, give up now.
+                        * If we just want to pre-initialize as much as we can without
+                        * flushing, give up now.
                         */
-                       if (opportunistic)
-                               break;
+                       upto = ReservedPtr - 1;
+                       break;
+               }
+
+               /*
+                * Attempt to reserve the page for initialization.  Failure means that
+                * this page got reserved by another process.
+                */
+               if (!pg_atomic_compare_exchange_u64(&XLogCtl->InitializeReserved,
+                                                                                       &ReservedPtr,
+                                                                                       ReservedPtr + XLOG_BLCKSZ))
+                       continue;
+
+               /* Fall through if it's already written out. */
+               if (LogwrtResult.Write < OldPageRqstPtr)
+               {
+                       /* Nope, got work to do. */
 
                        /* Advance shared memory write request position */
                        SpinLockAcquire(&XLogCtl->info_lck);
@@ -2031,14 +2075,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
                        RefreshXLogWriteResult(LogwrtResult);
                        if (LogwrtResult.Write < OldPageRqstPtr)
                        {
-                               /*
-                                * Must acquire write lock. Release WALBufMappingLock first,
-                                * to make sure that all insertions that we need to wait for
-                                * can finish (up to this same position). Otherwise we risk
-                                * deadlock.
-                                */
-                               LWLockRelease(WALBufMappingLock);
-
                                WaitXLogInsertionsToFinish(OldPageRqstPtr);
 
                                LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
@@ -2060,9 +2096,6 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
                                        PendingWalStats.wal_buffers_full++;
                                        TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
                                }
-                               /* Re-acquire WALBufMappingLock and retry */
-                               LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE);
-                               continue;
                        }
                }
 
@@ -2070,10 +2103,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
                 * Now the next buffer slot is free and we can set it up to be the
                 * next output page.
                 */
-               NewPageBeginPtr = XLogCtl->InitializedUpTo;
+               NewPageBeginPtr = ReservedPtr;
                NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
+               nextidx = XLogRecPtrToBufIdx(ReservedPtr);
 
-               Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
+#ifdef USE_ASSERT_CHECKING
+               {
+                       XLogRecPtr      storedBound = pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]);
+
+                       Assert(storedBound == OldPageRqstPtr || storedBound == InvalidXLogRecPtr);
+               }
+#endif
 
                NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
 
@@ -2139,11 +2179,50 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
                pg_write_barrier();
 
                pg_atomic_write_u64(&XLogCtl->xlblocks[nextidx], NewPageEndPtr);
-               XLogCtl->InitializedUpTo = NewPageEndPtr;
+
+               /*
+                * Try to advance XLogCtl->InitializedUpTo.
+                *
+                * If the CAS operation failed, then some of previous pages are not
+                * initialized yet, and this backend gives up.
+                *
+                * Since initializer of next page might give up on advancing of
+                * InitializedUpTo, this backend have to attempt advancing until it
+                * find page "in the past" or concurrent backend succeeded at
+                * advancing.  When we finish advancing XLogCtl->InitializedUpTo, we
+                * notify all the waiters with XLogCtl->InitializedUpToCondVar.
+                */
+               while (pg_atomic_compare_exchange_u64(&XLogCtl->InitializedUpTo, &NewPageBeginPtr, NewPageEndPtr))
+               {
+                       NewPageBeginPtr = NewPageEndPtr;
+                       NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
+                       nextidx = XLogRecPtrToBufIdx(NewPageBeginPtr);
+
+                       if (pg_atomic_read_u64(&XLogCtl->xlblocks[nextidx]) != NewPageEndPtr)
+                       {
+                               /*
+                                * Page at nextidx wasn't initialized yet, so we cann't move
+                                * InitializedUpto further. It will be moved by backend which
+                                * will initialize nextidx.
+                                */
+                               ConditionVariableBroadcast(&XLogCtl->InitializedUpToCondVar);
+                               break;
+                       }
+               }
 
                npages++;
        }
-       LWLockRelease(WALBufMappingLock);
+
+       END_CRIT_SECTION();
+
+       /*
+        * All the pages in WAL buffer before 'upto' were reserved for
+        * initialization.  However, some pages might be reserved by concurrent
+        * processes.  Wait till they finish initialization.
+        */
+       while (upto >= pg_atomic_read_u64(&XLogCtl->InitializedUpTo))
+               ConditionVariableSleep(&XLogCtl->InitializedUpToCondVar, WAIT_EVENT_WAL_BUFFER_INIT);
+       ConditionVariableCancelSleep();
 
 #ifdef WAL_DEBUG
        if (XLOG_DEBUG && npages > 0)
@@ -5044,6 +5123,10 @@ XLOGShmemInit(void)
        pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
        pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
        pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
+
+       pg_atomic_init_u64(&XLogCtl->InitializeReserved, InvalidXLogRecPtr);
+       pg_atomic_init_u64(&XLogCtl->InitializedUpTo, InvalidXLogRecPtr);
+       ConditionVariableInit(&XLogCtl->InitializedUpToCondVar);
 }
 
 /*
@@ -6063,7 +6146,7 @@ StartupXLOG(void)
                memset(page + len, 0, XLOG_BLCKSZ - len);
 
                pg_atomic_write_u64(&XLogCtl->xlblocks[firstIdx], endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
-               XLogCtl->InitializedUpTo = endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ;
+               pg_atomic_write_u64(&XLogCtl->InitializedUpTo, endOfRecoveryInfo->lastPageBeginPtr + XLOG_BLCKSZ);
        }
        else
        {
@@ -6072,8 +6155,9 @@ StartupXLOG(void)
                 * let the first attempt to insert a log record to initialize the next
                 * buffer.
                 */
-               XLogCtl->InitializedUpTo = EndOfLog;
+               pg_atomic_write_u64(&XLogCtl->InitializedUpTo, EndOfLog);
        }
+       pg_atomic_write_u64(&XLogCtl->InitializeReserved, pg_atomic_read_u64(&XLogCtl->InitializedUpTo));
 
        /*
         * Update local and shared status.  This is OK to do without any locks
index e199f071628987ec0626847d8c18632293dd3e89..ccf73781d81a6981fe62c579a4a85cc2cba389b0 100644 (file)
@@ -155,6 +155,7 @@ REPLICATION_SLOT_DROP       "Waiting for a replication slot to become inactive so it c
 RESTORE_COMMAND        "Waiting for <xref linkend="guc-restore-command"/> to complete."
 SAFE_SNAPSHOT  "Waiting to obtain a valid snapshot for a <literal>READ ONLY DEFERRABLE</literal> transaction."
 SYNC_REP       "Waiting for confirmation from a remote server during synchronous replication."
+WAL_BUFFER_INIT        "Waiting on WAL buffer to be initialized."
 WAL_RECEIVER_EXIT      "Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START        "Waiting for startup process to send initial data for streaming replication."
 WAL_SUMMARY_READY      "Waiting for a new WAL summary to be generated."
@@ -310,7 +311,6 @@ XidGen      "Waiting to allocate a new transaction ID."
 ProcArray      "Waiting to access the shared per-process data structures (typically, to get a snapshot or report a session's transaction ID)."
 SInvalRead     "Waiting to retrieve messages from the shared catalog invalidation queue."
 SInvalWrite    "Waiting to add a message to the shared catalog invalidation queue."
-WALBufMapping  "Waiting to replace a page in WAL buffers."
 WALWrite       "Waiting for WAL buffers to be written to disk."
 ControlFile    "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file."
 MultiXactGen   "Waiting to read or update shared multixact state."
index cf565452382be392f7db53e4baacb3bd27d133c5..ff897515769a2dcac6debe57999fc59d027de339 100644 (file)
@@ -37,7 +37,7 @@ PG_LWLOCK(3, XidGen)
 PG_LWLOCK(4, ProcArray)
 PG_LWLOCK(5, SInvalRead)
 PG_LWLOCK(6, SInvalWrite)
-PG_LWLOCK(7, WALBufMapping)
+/* 7 was WALBufMapping */
 PG_LWLOCK(8, WALWrite)
 PG_LWLOCK(9, ControlFile)
 /* 10 was CheckpointLock */