]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
localbuf: Introduce StartLocalBufferIO()
authorAndres Freund <andres@anarazel.de>
Sat, 15 Mar 2025 16:30:07 +0000 (12:30 -0400)
committerAndres Freund <andres@anarazel.de>
Sun, 16 Mar 2025 02:07:48 +0000 (22:07 -0400)
To initiate IO on a shared buffer we have StartBufferIO(). For temporary table
buffers no similar function exists - likely because the code for that
currently is very simple due to the lack of concurrency.

However, the upcoming AIO support will make it possible to re-encounter a
local buffer, while the buffer already is the target of IO. In that case we
need to wait for already in-progress IO to complete. This commit makes it
easier to add the necessary code, by introducing StartLocalBufferIO().

Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/CAAKRu_b9anbWzEs5AAF9WCvcEVmgz-1AkHSQ-CLLy-p7WHzvFw@mail.gmail.com

src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/include/storage/buf_internals.h

index f3c27d7e77adaf289861e02396bb58752824eee8..79ca9d18d07b04cea6fab7722d43867099aae3c8 100644 (file)
@@ -1038,7 +1038,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
        {
                /* Simple case for non-shared buffers. */
                bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-               need_to_zero = (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
+               need_to_zero = StartLocalBufferIO(bufHdr, true);
        }
        else
        {
@@ -1388,11 +1388,7 @@ static inline bool
 WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
 {
        if (BufferIsLocal(buffer))
-       {
-               BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-
-               return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
-       }
+               return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), true);
        else
                return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
 }
index 8efde05c0a5fb6cc88f30ac558532ef00b44d86f..f172a5c7820732ba6e1ea56f183f1c4516ed59b4 100644 (file)
@@ -183,6 +183,13 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
        instr_time      io_start;
        Page            localpage = (char *) LocalBufHdrGetBlock(bufHdr);
 
+       /*
+        * Try to start an I/O operation.  There currently are no reasons for
+        * StartLocalBufferIO to return false, so we raise an error in that case.
+        */
+       if (!StartLocalBufferIO(bufHdr, false))
+               elog(ERROR, "failed to start write IO on local buffer");
+
        /* Find smgr relation for buffer */
        if (reln == NULL)
                reln = smgropen(BufTagGetRelFileLocator(&bufHdr->tag),
@@ -406,11 +413,17 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
                        PinLocalBuffer(existing_hdr, false);
                        buffers[i] = BufferDescriptorGetBuffer(existing_hdr);
 
+                       /*
+                        * Clear the BM_VALID bit, do StartLocalBufferIO() and proceed.
+                        */
                        buf_state = pg_atomic_read_u32(&existing_hdr->state);
                        Assert(buf_state & BM_TAG_VALID);
                        Assert(!(buf_state & BM_DIRTY));
                        buf_state &= ~BM_VALID;
                        pg_atomic_unlocked_write_u32(&existing_hdr->state, buf_state);
+
+                       /* no need to loop for local buffers */
+                       StartLocalBufferIO(existing_hdr, true);
                }
                else
                {
@@ -425,6 +438,8 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
                        pg_atomic_unlocked_write_u32(&victim_buf_hdr->state, buf_state);
 
                        hresult->id = victim_buf_id;
+
+                       StartLocalBufferIO(victim_buf_hdr, true);
                }
        }
 
@@ -489,6 +504,27 @@ MarkLocalBufferDirty(Buffer buffer)
        pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
 }
 
+/*
+ * Like StartBufferIO, but for local buffers
+ */
+bool
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput)
+{
+       uint32          buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+       if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
+       {
+               /* someone else already did the I/O */
+               return false;
+       }
+
+       /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
+
+       /* local buffers don't track IO using resowners */
+
+       return true;
+}
+
 /*
  * Like TerminateBufferIO, but for local buffers
  */
index 90bc7e0db7be25318d5726a93e7a882328bd5e81..9327f60c44ccf64efae05c458d70525c1bdb3620 100644 (file)
@@ -473,6 +473,7 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
                                                                   uint32 set_flag_bits);
+extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
                                                                         ForkNumber forkNum,