]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
bufmgr: Improve StartBufferIO interface
authorAndres Freund <andres@anarazel.de>
Fri, 27 Mar 2026 23:02:23 +0000 (19:02 -0400)
committerAndres Freund <andres@anarazel.de>
Fri, 27 Mar 2026 23:08:12 +0000 (19:08 -0400)
Until now StartBufferIO() had a few weaknesses:

- As it did not submit staged IOs, it was not safe to call StartBufferIO()
  where there was a potential for unsubmitted IO, which required
  AsyncReadBuffers() to use a wrapper (ReadBuffersCanStartIO()) around
  StartBufferIO().

- With nowait = true, the boolean return value did not allow to distinguish
  between no IO being necessary and having to wait, which would lead
  ReadBuffersCanStartIO() to unnecessarily submit staged IO.

- Several callers needed to handle both local and shared buffers, requiring
  the caller to differentiate between StartBufferIO() and StartLocalBufferIO()

- In a future commit some callers of StartBufferIO() want the BufferDesc's
  io_wref to be returned, to asynchronously wait for in-progress IO

- Indicating whether to wait with the nowait parameter was somewhat confusing
  compared to a wait parameter

Address these issues as follows:

- StartBufferIO() is renamed to StartSharedBufferIO()

- A new StartBufferIO() is introduced that supports both shared and local
  buffers

- The boolean return value has been replaced with an enum, indicating whether
  the IO is already done, already in progress or that the buffer has been
  readied for IO

- A new PgAioWaitRef * argument allows the caller to get the wait reference is
  desired.  All current callers pass NULL, a user of this will be introduced
  subsequently

- Instead of the nowait argument there now is wait

  This probably would not have been worthwhile on its own, but since all these
  lines needed to be touched anyway...

Author: Andres Freund <andres@anarazel.de>
Author: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv

src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/include/storage/buf_internals.h
src/test/modules/test_aio/t/001_aio.pl
src/test/modules/test_aio/test_aio--1.0.sql
src/test/modules/test_aio/test_aio.c
src/tools/pgindent/typedefs.list

index da87c85a079f8c509d1856d465121f80c334bf7c..4d8c3b5d37b98e24184a9c470f09e339706bb077 100644 (file)
@@ -1148,6 +1148,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
        BufferDesc *bufHdr;
        bool            need_to_zero;
        bool            isLocalBuf = BufferIsLocal(buffer);
+       StartBufferIOResult sbres;
 
        Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
 
@@ -1159,24 +1160,30 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid)
                 */
                need_to_zero = false;
        }
-       else if (isLocalBuf)
-       {
-               /* Simple case for non-shared buffers. */
-               bufHdr = GetLocalBufferDescriptor(-buffer - 1);
-               need_to_zero = StartLocalBufferIO(bufHdr, true, false);
-       }
        else
        {
-               /*
-                * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
-                * concurrently.  Even though we aren't doing I/O, that ensures that
-                * we don't zero a page that someone else has pinned.  An exclusive
-                * content lock wouldn't be enough, because readers are allowed to
-                * drop the content lock after determining that a tuple is visible
-                * (see buffer access rules in README).
-                */
-               bufHdr = GetBufferDescriptor(buffer - 1);
-               need_to_zero = StartBufferIO(bufHdr, true, false);
+               if (isLocalBuf)
+               {
+                       /* Simple case for non-shared buffers. */
+                       bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+                       sbres = StartLocalBufferIO(bufHdr, true, true, NULL);
+               }
+               else
+               {
+                       /*
+                        * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
+                        * concurrently.  Even though we aren't doing I/O, that ensures
+                        * that we don't zero a page that someone else has pinned.  An
+                        * exclusive content lock wouldn't be enough, because readers are
+                        * allowed to drop the content lock after determining that a tuple
+                        * is visible (see buffer access rules in README).
+                        */
+                       bufHdr = GetBufferDescriptor(buffer - 1);
+                       sbres = StartSharedBufferIO(bufHdr, true, true, NULL);
+               }
+
+               Assert(sbres != BUFFER_IO_IN_PROGRESS);
+               need_to_zero = sbres == BUFFER_IO_READY_FOR_IO;
        }
 
        if (need_to_zero)
@@ -1659,45 +1666,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete)
 #endif
 }
 
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
-       if (BufferIsLocal(buffer))
-               return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
-                                                                 true, nowait);
-       else
-               return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
-       /*
-        * If this backend currently has staged IO, we need to submit the pending
-        * IO before waiting for the right to issue IO, to avoid the potential for
-        * deadlocks (and, more commonly, unnecessary delays for other backends).
-        */
-       if (!nowait && pgaio_have_staged())
-       {
-               if (ReadBuffersCanStartIOOnce(buffer, true))
-                       return true;
-
-               /*
-                * Unfortunately StartBufferIO() returning false doesn't allow to
-                * distinguish between the buffer already being valid and IO already
-                * being in progress. Since IO already being in progress is quite
-                * rare, this approach seems fine.
-                */
-               pgaio_submit_staged();
-       }
-
-       return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
 /*
  * We track various stats related to buffer hits. Because this is done in a
  * few separate places, this helper exists for convenience.
@@ -1921,6 +1889,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
        IOContext       io_context;
        IOObject        io_object;
        instr_time      io_start;
+       StartBufferIOResult status;
 
        if (persistence == RELPERSISTENCE_TEMP)
        {
@@ -1974,8 +1943,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
        pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
 
        /*
-        * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
-        * might block, which we don't want after setting IO_IN_PROGRESS.
+        * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
+        * block, which we don't want after setting IO_IN_PROGRESS.
         *
         * If we need to wait for IO before we can get a handle, submit
         * already-staged IO first, so that other backends don't need to wait.
@@ -2004,31 +1973,41 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
         * for the outcome: either done, or something went wrong and we will
         * retry.
         */
-       if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+       status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+       if (status != BUFFER_IO_READY_FOR_IO)
        {
-               /*
-                * Someone else has already completed this block, we're done.
-                *
-                * When IO is necessary, ->nblocks_done is updated in
-                * ProcessReadBuffersResult(), but that is not called if no IO is
-                * necessary. Thus update here.
-                */
-               operation->nblocks_done += 1;
+               pgaio_io_release(ioh);
                *nblocks_progress = 1;
+               if (status == BUFFER_IO_ALREADY_DONE)
+               {
+                       /*
+                        * Someone has already completed this block, we're done.
+                        *
+                        * When IO is necessary, ->nblocks_done is updated in
+                        * ProcessReadBuffersResult(), but that is not called if no IO is
+                        * necessary. Thus update here.
+                        */
+                       operation->nblocks_done += 1;
+                       Assert(operation->nblocks_done <= operation->nblocks);
 
-               pgaio_io_release(ioh);
-               pgaio_wref_clear(&operation->io_wref);
+                       Assert(!pgaio_wref_valid(&operation->io_wref));
 
-               /*
-                * Report and track this as a 'hit' for this backend, even though it
-                * must have started out as a miss in PinBufferForBlock(). The other
-                * backend will track this as a 'read'.
-                */
-               TrackBufferHit(io_object, io_context,
-                                          operation->rel, operation->persistence,
-                                          operation->smgr, operation->forknum,
-                                          blocknum);
-               return false;
+                       /*
+                        * Report and track this as a 'hit' for this backend, even though
+                        * it must have started out as a miss in PinBufferForBlock(). The
+                        * other backend will track this as a 'read'.
+                        */
+                       TrackBufferHit(io_object, io_context,
+                                                  operation->rel, operation->persistence,
+                                                  operation->smgr, operation->forknum,
+                                                  blocknum);
+                       return false;
+               }
+
+               /* The IO is already in-progress */
+               Assert(status == BUFFER_IO_IN_PROGRESS);
+
+               return true;
        }
 
        Assert(io_buffers[0] == buffers[nblocks_done]);
@@ -2037,9 +2016,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
 
        /*
         * NB: As little code as possible should be added between the
-        * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
-        * below and the smgrstartreadv(), as some of the buffers are now marked
-        * as IO_IN_PROGRESS and will thus cause other backends to wait.
+        * StartBufferIO() above, the further StartBufferIO()s below and the
+        * smgrstartreadv(), as some of the buffers are now marked as
+        * IO_IN_PROGRESS and will thus cause other backends to wait.
         */
 
        /*
@@ -2053,7 +2032,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress)
                Assert(BufferGetBlockNumber(buffers[i - 1]) ==
                           BufferGetBlockNumber(buffers[i]) - 1);
 
-               if (!ReadBuffersCanStartIO(buffers[i], true))
+               status = StartBufferIO(buffers[i], true, false, NULL);
+               if (status != BUFFER_IO_READY_FOR_IO)
                        break;
 
                Assert(io_buffers[io_buffers_len] == buffers[i]);
@@ -2892,16 +2872,23 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
                         * We *must* do smgr[zero]extend before succeeding, else the page
                         * will not be reserved by the kernel, and the next P_NEW call
                         * will decide to return the same page.  Clear the BM_VALID bit,
-                        * do StartBufferIO() and proceed.
+                        * do StartSharedBufferIO() and proceed.
                         *
                         * Loop to handle the very small possibility that someone re-sets
-                        * BM_VALID between our clearing it and StartBufferIO inspecting
-                        * it.
+                        * BM_VALID between our clearing it and StartSharedBufferIO
+                        * inspecting it.
                         */
-                       do
+                       while (true)
                        {
+                               StartBufferIOResult sbres;
+
                                pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID);
-                       } while (!StartBufferIO(existing_hdr, true, false));
+
+                               sbres = StartSharedBufferIO(existing_hdr, true, true, NULL);
+
+                               if (sbres != BUFFER_IO_ALREADY_DONE)
+                                       break;
+                       }
                }
                else
                {
@@ -2927,7 +2914,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
                        LWLockRelease(partition_lock);
 
                        /* XXX: could combine the locked operations in it with the above */
-                       StartBufferIO(victim_buf_hdr, true, false);
+                       StartSharedBufferIO(victim_buf_hdr, true, true, NULL);
                }
        }
 
@@ -4448,7 +4435,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
         * someone else flushed the buffer before we could, so we need not do
         * anything.
         */
-       if (!StartBufferIO(buf, false, false))
+       if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE)
                return;
 
        /* Setup error traceback support for ereport() */
@@ -7072,6 +7059,13 @@ WaitIO(BufferDesc *buf)
 {
        ConditionVariable *cv = BufferDescriptorGetIOCV(buf);
 
+       /*
+        * Should never end up here with unsubmitted IO, as no AIO unaware code
+        * may be used while in batch mode and AIO aware code needs to have
+        * submitted all staged IO to avoid deadlocks & slowness.
+        */
+       Assert(!pgaio_have_staged());
+
        ConditionVariablePrepareToSleep(cv);
        for (;;)
        {
@@ -7124,30 +7118,46 @@ WaitIO(BufferDesc *buf)
 }
 
 /*
- * StartBufferIO: begin I/O on this buffer
+ * StartSharedBufferIO: begin I/O on this buffer
  *     (Assumptions)
- *     My process is executing no IO on this buffer
  *     The buffer is Pinned
  *
- * In some scenarios multiple backends could attempt the same I/O operation
- * concurrently.  If someone else has already started I/O on this buffer then
- * we will wait for completion of the IO using WaitIO().
+ * In several scenarios the buffer may already be undergoing I/O in this or
+ * another backend. How to best handle that depends on the caller's
+ * situation. It might be appropriate to wait synchronously (e.g., because the
+ * buffer is about to be invalidated); wait asynchronously, using the buffer's
+ * IO wait reference (e.g., because the caller is doing readahead and doesn't
+ * need the buffer to be ready immediately); or to not wait at all (e.g.,
+ * because the caller is trying to combine IO for this buffer with another
+ * buffer).
+ *
+ * How and whether to wait is controlled by the wait and io_wref
+ * parameters. In detail:
+ *
+ * - If the caller passes a non-NULL io_wref and the buffer has an I/O wait
+ *   reference, the *io_wref is set to the buffer's io_wref and
+ *   BUFFER_IO_IN_PROGRESS is returned. This is done regardless of the wait
+ *   parameter.
  *
- * Input operations are only attempted on buffers that are not BM_VALID,
- * and output operations only on buffers that are BM_VALID and BM_DIRTY,
- * so we can always tell if the work is already done.
+ * - If the caller passes a NULL io_wref (i.e. the caller does not want to
+ *   asynchronously wait for the completion of the IO), wait = false and the
+ *   buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned.
  *
- * Returns true if we successfully marked the buffer as I/O busy,
- * false if someone else already did the work.
+ * - If wait = true and either the buffer does not have a wait reference,
+ *   or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO
+ *   to complete.  To avoid the potential of deadlocks and unnecessary delays,
+ *   all staged I/O is submitted before waiting.
  *
- * If nowait is true, then we don't wait for an I/O to be finished by another
- * backend.  In that case, false indicates either that the I/O was already
- * finished, or is still in progress.  This is useful for callers that want to
- * find out if they can perform the I/O as part of a larger operation, without
- * waiting for the answer or distinguishing the reasons why not.
+ * Input operations are only attempted on buffers that are not BM_VALID, and
+ * output operations only on buffers that are BM_VALID and BM_DIRTY, so we can
+ * always tell if the work is already done.  If no I/O is necessary,
+ * BUFFER_IO_ALREADY_DONE is returned.
+ *
+ * If we successfully marked the buffer as BM_IO_IN_PROGRESS,
+ * BUFFER_IO_READY_FOR_IO is returned.
  */
-bool
-StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
+StartBufferIOResult
+StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
        uint64          buf_state;
 
@@ -7159,10 +7169,42 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
 
                if (!(buf_state & BM_IO_IN_PROGRESS))
                        break;
-               UnlockBufHdr(buf);
-               if (nowait)
-                       return false;
-               WaitIO(buf);
+
+               /* Join the existing IO */
+               if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref))
+               {
+                       *io_wref = buf->io_wref;
+                       UnlockBufHdr(buf);
+
+                       return BUFFER_IO_IN_PROGRESS;
+               }
+               else if (!wait)
+               {
+                       UnlockBufHdr(buf);
+                       return BUFFER_IO_IN_PROGRESS;
+               }
+               else
+               {
+                       /*
+                        * With wait = true, we always have to wait if the caller has
+                        * passed io_wref = NULL.
+                        *
+                        * Even with io_wref != NULL, we have to wait if the buffer's wait
+                        * ref is not valid but the IO is in progress, someone else
+                        * started IO but hasn't set the wait ref yet. We have no choice
+                        * but to wait until the IO completes.
+                        */
+                       UnlockBufHdr(buf);
+
+                       /*
+                        * If this backend currently has staged IO, submit it before
+                        * waiting for in-progress IO, to avoid potential deadlocks and
+                        * unnecessary delays.
+                        */
+                       pgaio_submit_staged();
+
+                       WaitIO(buf);
+               }
        }
 
        /* Once we get here, there is definitely no I/O active on this buffer */
@@ -7171,9 +7213,14 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
        if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
        {
                UnlockBufHdr(buf);
-               return false;
+               return BUFFER_IO_ALREADY_DONE;
        }
 
+       /*
+        * No IO in progress and not already done; we will start IO. It's possible
+        * that the IO was in progress but we're not done, because the IO errored
+        * out. We'll do the IO ourselves.
+        */
        UnlockBufHdrExt(buf, buf_state,
                                        BM_IO_IN_PROGRESS, 0,
                                        0);
@@ -7181,7 +7228,31 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
        ResourceOwnerRememberBufferIO(CurrentResourceOwner,
                                                                  BufferDescriptorGetBuffer(buf));
 
-       return true;
+       return BUFFER_IO_READY_FOR_IO;
+}
+
+/*
+ * Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used
+ * when the caller doesn't otherwise need to care about local vs shared. See
+ * StartSharedBufferIO() for details.
+ */
+StartBufferIOResult
+StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref)
+{
+       BufferDesc *buf_hdr;
+
+       if (BufferIsLocal(buffer))
+       {
+               buf_hdr = GetLocalBufferDescriptor(-buffer - 1);
+
+               return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref);
+       }
+       else
+       {
+               buf_hdr = GetBufferDescriptor(buffer - 1);
+
+               return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref);
+       }
 }
 
 /*
index b69398c637567d3f5ac69205aa5c04079a8ae96d..396da84b25c55cdfc361278f6ad58840c95c3673 100644 (file)
@@ -189,9 +189,10 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln)
 
        /*
         * Try to start an I/O operation.  There currently are no reasons for
-        * StartLocalBufferIO to return false, so we raise an error in that case.
+        * StartLocalBufferIO to return anything other than
+        * BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
         */
-       if (!StartLocalBufferIO(bufHdr, false, false))
+       if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
                elog(ERROR, "failed to start write IO on local buffer");
 
        /* Find smgr relation for buffer */
@@ -435,7 +436,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
                        pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
 
                        /* no need to loop for local buffers */
-                       StartLocalBufferIO(existing_hdr, true, false);
+                       StartLocalBufferIO(existing_hdr, true, true, NULL);
                }
                else
                {
@@ -451,7 +452,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr,
 
                        hresult->id = victim_buf_id;
 
-                       StartLocalBufferIO(victim_buf_hdr, true, false);
+                       StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
                }
        }
 
@@ -517,26 +518,41 @@ MarkLocalBufferDirty(Buffer buffer)
 }
 
 /*
- * Like StartBufferIO, but for local buffers
+ * Like StartSharedBufferIO, but for local buffers
  */
-bool
-StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
+StartBufferIOResult
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
 {
        uint64          buf_state;
 
        /*
         * With AIO the buffer could have IO in progress, e.g. when there are two
-        * scans of the same relation. Either wait for the other IO or return
-        * false.
+        * scans of the same relation.  Either wait for the other IO (if wait =
+        * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
         */
        if (pgaio_wref_valid(&bufHdr->io_wref))
        {
-               PgAioWaitRef iow = bufHdr->io_wref;
+               PgAioWaitRef buf_wref = bufHdr->io_wref;
 
-               if (nowait)
-                       return false;
+               if (io_wref != NULL)
+               {
+                       /* We've already asynchronously started this IO, so join it */
+                       *io_wref = buf_wref;
+                       return BUFFER_IO_IN_PROGRESS;
+               }
 
-               pgaio_wref_wait(&iow);
+               /*
+                * For temp buffers we should never need to wait in
+                * StartLocalBufferIO() when called with io_wref == NULL while there
+                * are staged IOs, as it's not allowed to call code that is not aware
+                * of AIO while in batch mode.
+                */
+               Assert(!pgaio_have_staged());
+
+               if (!wait)
+                       return BUFFER_IO_IN_PROGRESS;
+
+               pgaio_wref_wait(&buf_wref);
        }
 
        /* Once we get here, there is definitely no I/O active on this buffer */
@@ -545,14 +561,14 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
        buf_state = pg_atomic_read_u64(&bufHdr->state);
        if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
        {
-               return false;
+               return BUFFER_IO_ALREADY_DONE;
        }
 
        /* BM_IO_IN_PROGRESS isn't currently used for local buffers */
 
        /* local buffers don't track IO using resowners */
 
-       return true;
+       return BUFFER_IO_READY_FOR_IO;
 }
 
 /*
index 8d1e16b5d519133ff1a6371002aea6b38581c2a4..ad1b7b2216a4d9d372ba82554ddd4eeca1a04674 100644 (file)
@@ -554,8 +554,25 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context,
 
 extern void TrackNewBufferPin(Buffer buf);
 
-/* solely to make it easier to write tests */
-extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
+/*
+ * Return value for StartBufferIO / StartSharedBufferIO / StartLocalBufferIO.
+ *
+ * When preparing a buffer for I/O and setting BM_IO_IN_PROGRESS, the buffer
+ * may already have I/O in progress or the I/O may have been done by another
+ * backend.  See the documentation of StartSharedBufferIO for more details.
+ */
+typedef enum StartBufferIOResult
+{
+       BUFFER_IO_ALREADY_DONE,
+       BUFFER_IO_IN_PROGRESS,
+       BUFFER_IO_READY_FOR_IO,
+} StartBufferIOResult;
+
+/* the following are exposed to make it easier to write tests */
+extern StartBufferIOResult StartBufferIO(Buffer buffer, bool forInput, bool wait,
+                                                                                PgAioWaitRef *io_wref);
+extern StartBufferIOResult StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait,
+                                                                                          PgAioWaitRef *io_wref);
 extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits,
                                                          bool forget_owner, bool release_aio);
 
@@ -600,7 +617,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr,
 extern void MarkLocalBufferDirty(Buffer buffer);
 extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty,
                                                                   uint64 set_flag_bits, bool release_aio);
-extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait);
+extern StartBufferIOResult StartLocalBufferIO(BufferDesc *bufHdr, bool forInput,
+                                                                                         bool wait, PgAioWaitRef *io_wref);
 extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln);
 extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced);
 extern void DropRelationLocalBuffers(RelFileLocator rlocator,
index 0317c991f9f3fa99ea40ab5edad63dce4be74935..1f9e83690f43b131d1f00cb555adb87310e5973d 100644 (file)
@@ -383,7 +383,7 @@ sub test_startwait_io
                $io_method,
                $psql_a,
                "first StartBufferIO",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
                qr/^t$/,
                qr/^$/);
 
@@ -392,14 +392,14 @@ sub test_startwait_io
                $io_method,
                $psql_a,
                "second StartBufferIO fails, same session",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
                qr/^f$/,
                qr/^$/);
        psql_like(
                $io_method,
                $psql_b,
                "second StartBufferIO fails, other session",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
                qr/^f$/,
                qr/^$/);
 
@@ -409,7 +409,7 @@ sub test_startwait_io
                $node,
                $psql_b,
                "blocking start buffer io",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
                "BufferIo");
 
        # Terminate the IO, without marking it as success, this should trigger the
@@ -438,7 +438,7 @@ sub test_startwait_io
                $io_method,
                $psql_a,
                "blocking buffer io w/ success: first start buffer io",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
                qr/^t$/,
                qr/^$/);
 
@@ -448,7 +448,7 @@ sub test_startwait_io
                $node,
                $psql_b,
                "blocking start buffer io",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
                "BufferIo");
 
        # Terminate the IO, marking it as success
@@ -486,7 +486,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
                $io_method,
                $psql_a,
                "first StartLocalBufferIO",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
                qr/^t$/,
                qr/^$/);
 
@@ -497,7 +497,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
                $io_method,
                $psql_a,
                "second StartLocalBufferIO succeeds, same session",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
                qr/^t$/,
                qr/^$/);
 
@@ -509,7 +509,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
                $io_method,
                $psql_a,
                "StartLocalBufferIO after not marking valid succeeds, same session",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
                qr/^t$/,
                qr/^$/);
 
@@ -524,7 +524,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000);
                $io_method,
                $psql_a,
                "StartLocalBufferIO after marking valid fails",
-               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+               qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
                qr/^f$/,
                qr/^$/);
 
@@ -1612,7 +1612,7 @@ read_buffers('$table', 1, 3)|,
                my $buf_id =
                  $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
                $psql_b->query_safe(
-                       qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+                       qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
                );
 
                query_wait_block(
@@ -1637,7 +1637,7 @@ read_buffers('$table', 1, 3)|,
                $buf_id =
                  $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
                $psql_b->query_safe(
-                       qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+                       qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
                );
 
                query_wait_block(
index 4a5a379b3c5d52e1940978c59aa65dff40442777..53c83b74e5355e9a87ace4780c5476840f9fc06c 100644 (file)
@@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4)
 RETURNS pg_catalog.int4 STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
-CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool)
+CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, wait bool)
 RETURNS pg_catalog.bool STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
index eeeab9054c8432369336f65f2b4da5a8d3ae785e..a0ad2313661223ab3f3c05d0a9ee09c9e3a5373b 100644 (file)
@@ -434,13 +434,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
        if (RelationUsesLocalBuffers(rel))
        {
                for (int i = 0; i < nblocks; i++)
-                       StartLocalBufferIO(buf_hdrs[i], true, false);
+                       StartLocalBufferIO(buf_hdrs[i], true, true, NULL);
                pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL);
        }
        else
        {
                for (int i = 0; i < nblocks; i++)
-                       StartBufferIO(buf_hdrs[i], true, false);
+                       StartSharedBufferIO(buf_hdrs[i], true, true, NULL);
        }
 
        pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks);
@@ -622,15 +622,18 @@ buffer_call_start_io(PG_FUNCTION_ARGS)
 {
        Buffer          buf = PG_GETARG_INT32(0);
        bool            for_input = PG_GETARG_BOOL(1);
-       bool            nowait = PG_GETARG_BOOL(2);
+       bool            wait = PG_GETARG_BOOL(2);
+       StartBufferIOResult result;
        bool            can_start;
 
        if (BufferIsLocal(buf))
-               can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
-                                                                          for_input, nowait);
+               result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1),
+                                                                       for_input, wait, NULL);
        else
-               can_start = StartBufferIO(GetBufferDescriptor(buf - 1),
-                                                                 for_input, nowait);
+               result = StartSharedBufferIO(GetBufferDescriptor(buf - 1),
+                                                                        for_input, wait, NULL);
+
+       can_start = result == BUFFER_IO_READY_FOR_IO;
 
        /*
         * For tests we don't want the resowner release preventing us from
index b2c7c9e6f7c81c9610caf4b3ab15823706653b72..e3c1007abdf5f9eb9a3b10f849d00a206fcf8390 100644 (file)
@@ -2946,6 +2946,7 @@ SplitTextOutputData
 SplitVar
 StackElem
 StakindFlags
+StartBufferIOResult
 StartDataPtrType
 StartLOPtrType
 StartLOsPtrType