From: Andres Freund Date: Fri, 27 Mar 2026 23:02:23 +0000 (-0400) Subject: bufmgr: Improve StartBufferIO interface X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=74eafeab1a576620879e889ec70bf826e3e7fce8;p=thirdparty%2Fpostgresql.git bufmgr: Improve StartBufferIO interface Until now StartBufferIO() had a few weaknesses: - As it did not submit staged IOs, it was not safe to call StartBufferIO() where there was a potential for unsubmitted IO, which required AsyncReadBuffers() to use a wrapper (ReadBuffersCanStartIO()) around StartBufferIO(). - With nowait = true, the boolean return value did not allow to distinguish between no IO being necessary and having to wait, which would lead ReadBuffersCanStartIO() to unnecessarily submit staged IO. - Several callers needed to handle both local and shared buffers, requiring the caller to differentiate between StartBufferIO() and StartLocalBufferIO() - In a future commit some callers of StartBufferIO() want the BufferDesc's io_wref to be returned, to asynchronously wait for in-progress IO - Indicating whether to wait with the nowait parameter was somewhat confusing compared to a wait parameter Address these issues as follows: - StartBufferIO() is renamed to StartSharedBufferIO() - A new StartBufferIO() is introduced that supports both shared and local buffers - The boolean return value has been replaced with an enum, indicating whether the IO is already done, already in progress or that the buffer has been readied for IO - A new PgAioWaitRef * argument allows the caller to get the wait reference is desired. All current callers pass NULL, a user of this will be introduced subsequently - Instead of the nowait argument there now is wait This probably would not have been worthwhile on its own, but since all these lines needed to be touched anyway... Author: Andres Freund Author: Melanie Plageman Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv --- diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index da87c85a079..4d8c3b5d37b 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -1148,6 +1148,7 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) BufferDesc *bufHdr; bool need_to_zero; bool isLocalBuf = BufferIsLocal(buffer); + StartBufferIOResult sbres; Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); @@ -1159,24 +1160,30 @@ ZeroAndLockBuffer(Buffer buffer, ReadBufferMode mode, bool already_valid) */ need_to_zero = false; } - else if (isLocalBuf) - { - /* Simple case for non-shared buffers. */ - bufHdr = GetLocalBufferDescriptor(-buffer - 1); - need_to_zero = StartLocalBufferIO(bufHdr, true, false); - } else { - /* - * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set - * concurrently. Even though we aren't doing I/O, that ensures that - * we don't zero a page that someone else has pinned. An exclusive - * content lock wouldn't be enough, because readers are allowed to - * drop the content lock after determining that a tuple is visible - * (see buffer access rules in README). - */ - bufHdr = GetBufferDescriptor(buffer - 1); - need_to_zero = StartBufferIO(bufHdr, true, false); + if (isLocalBuf) + { + /* Simple case for non-shared buffers. */ + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + sbres = StartLocalBufferIO(bufHdr, true, true, NULL); + } + else + { + /* + * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set + * concurrently. Even though we aren't doing I/O, that ensures + * that we don't zero a page that someone else has pinned. An + * exclusive content lock wouldn't be enough, because readers are + * allowed to drop the content lock after determining that a tuple + * is visible (see buffer access rules in README). + */ + bufHdr = GetBufferDescriptor(buffer - 1); + sbres = StartSharedBufferIO(bufHdr, true, true, NULL); + } + + Assert(sbres != BUFFER_IO_IN_PROGRESS); + need_to_zero = sbres == BUFFER_IO_READY_FOR_IO; } if (need_to_zero) @@ -1659,45 +1666,6 @@ CheckReadBuffersOperation(ReadBuffersOperation *operation, bool is_complete) #endif } -/* helper for ReadBuffersCanStartIO(), to avoid repetition */ -static inline bool -ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait) -{ - if (BufferIsLocal(buffer)) - return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1), - true, nowait); - else - return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); -} - -/* - * Helper for AsyncReadBuffers that tries to get the buffer ready for IO. - */ -static inline bool -ReadBuffersCanStartIO(Buffer buffer, bool nowait) -{ - /* - * If this backend currently has staged IO, we need to submit the pending - * IO before waiting for the right to issue IO, to avoid the potential for - * deadlocks (and, more commonly, unnecessary delays for other backends). - */ - if (!nowait && pgaio_have_staged()) - { - if (ReadBuffersCanStartIOOnce(buffer, true)) - return true; - - /* - * Unfortunately StartBufferIO() returning false doesn't allow to - * distinguish between the buffer already being valid and IO already - * being in progress. Since IO already being in progress is quite - * rare, this approach seems fine. - */ - pgaio_submit_staged(); - } - - return ReadBuffersCanStartIOOnce(buffer, nowait); -} - /* * We track various stats related to buffer hits. Because this is done in a * few separate places, this helper exists for convenience. @@ -1921,6 +1889,7 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) IOContext io_context; IOObject io_object; instr_time io_start; + StartBufferIOResult status; if (persistence == RELPERSISTENCE_TEMP) { @@ -1974,8 +1943,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid); /* - * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire() - * might block, which we don't want after setting IO_IN_PROGRESS. + * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might + * block, which we don't want after setting IO_IN_PROGRESS. * * If we need to wait for IO before we can get a handle, submit * already-staged IO first, so that other backends don't need to wait. @@ -2004,31 +1973,41 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) * for the outcome: either done, or something went wrong and we will * retry. */ - if (!ReadBuffersCanStartIO(buffers[nblocks_done], false)) + status = StartBufferIO(buffers[nblocks_done], true, true, NULL); + if (status != BUFFER_IO_READY_FOR_IO) { - /* - * Someone else has already completed this block, we're done. - * - * When IO is necessary, ->nblocks_done is updated in - * ProcessReadBuffersResult(), but that is not called if no IO is - * necessary. Thus update here. - */ - operation->nblocks_done += 1; + pgaio_io_release(ioh); *nblocks_progress = 1; + if (status == BUFFER_IO_ALREADY_DONE) + { + /* + * Someone has already completed this block, we're done. + * + * When IO is necessary, ->nblocks_done is updated in + * ProcessReadBuffersResult(), but that is not called if no IO is + * necessary. Thus update here. + */ + operation->nblocks_done += 1; + Assert(operation->nblocks_done <= operation->nblocks); - pgaio_io_release(ioh); - pgaio_wref_clear(&operation->io_wref); + Assert(!pgaio_wref_valid(&operation->io_wref)); - /* - * Report and track this as a 'hit' for this backend, even though it - * must have started out as a miss in PinBufferForBlock(). The other - * backend will track this as a 'read'. - */ - TrackBufferHit(io_object, io_context, - operation->rel, operation->persistence, - operation->smgr, operation->forknum, - blocknum); - return false; + /* + * Report and track this as a 'hit' for this backend, even though + * it must have started out as a miss in PinBufferForBlock(). The + * other backend will track this as a 'read'. + */ + TrackBufferHit(io_object, io_context, + operation->rel, operation->persistence, + operation->smgr, operation->forknum, + blocknum); + return false; + } + + /* The IO is already in-progress */ + Assert(status == BUFFER_IO_IN_PROGRESS); + + return true; } Assert(io_buffers[0] == buffers[nblocks_done]); @@ -2037,9 +2016,9 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) /* * NB: As little code as possible should be added between the - * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s - * below and the smgrstartreadv(), as some of the buffers are now marked - * as IO_IN_PROGRESS and will thus cause other backends to wait. + * StartBufferIO() above, the further StartBufferIO()s below and the + * smgrstartreadv(), as some of the buffers are now marked as + * IO_IN_PROGRESS and will thus cause other backends to wait. */ /* @@ -2053,7 +2032,8 @@ AsyncReadBuffers(ReadBuffersOperation *operation, int *nblocks_progress) Assert(BufferGetBlockNumber(buffers[i - 1]) == BufferGetBlockNumber(buffers[i]) - 1); - if (!ReadBuffersCanStartIO(buffers[i], true)) + status = StartBufferIO(buffers[i], true, false, NULL); + if (status != BUFFER_IO_READY_FOR_IO) break; Assert(io_buffers[io_buffers_len] == buffers[i]); @@ -2892,16 +2872,23 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, * We *must* do smgr[zero]extend before succeeding, else the page * will not be reserved by the kernel, and the next P_NEW call * will decide to return the same page. Clear the BM_VALID bit, - * do StartBufferIO() and proceed. + * do StartSharedBufferIO() and proceed. * * Loop to handle the very small possibility that someone re-sets - * BM_VALID between our clearing it and StartBufferIO inspecting - * it. + * BM_VALID between our clearing it and StartSharedBufferIO + * inspecting it. */ - do + while (true) { + StartBufferIOResult sbres; + pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID); - } while (!StartBufferIO(existing_hdr, true, false)); + + sbres = StartSharedBufferIO(existing_hdr, true, true, NULL); + + if (sbres != BUFFER_IO_ALREADY_DONE) + break; + } } else { @@ -2927,7 +2914,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true, false); + StartSharedBufferIO(victim_buf_hdr, true, true, NULL); } } @@ -4448,7 +4435,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false, false)) + if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE) return; /* Setup error traceback support for ereport() */ @@ -7072,6 +7059,13 @@ WaitIO(BufferDesc *buf) { ConditionVariable *cv = BufferDescriptorGetIOCV(buf); + /* + * Should never end up here with unsubmitted IO, as no AIO unaware code + * may be used while in batch mode and AIO aware code needs to have + * submitted all staged IO to avoid deadlocks & slowness. + */ + Assert(!pgaio_have_staged()); + ConditionVariablePrepareToSleep(cv); for (;;) { @@ -7124,30 +7118,46 @@ WaitIO(BufferDesc *buf) } /* - * StartBufferIO: begin I/O on this buffer + * StartSharedBufferIO: begin I/O on this buffer * (Assumptions) - * My process is executing no IO on this buffer * The buffer is Pinned * - * In some scenarios multiple backends could attempt the same I/O operation - * concurrently. If someone else has already started I/O on this buffer then - * we will wait for completion of the IO using WaitIO(). + * In several scenarios the buffer may already be undergoing I/O in this or + * another backend. How to best handle that depends on the caller's + * situation. It might be appropriate to wait synchronously (e.g., because the + * buffer is about to be invalidated); wait asynchronously, using the buffer's + * IO wait reference (e.g., because the caller is doing readahead and doesn't + * need the buffer to be ready immediately); or to not wait at all (e.g., + * because the caller is trying to combine IO for this buffer with another + * buffer). + * + * How and whether to wait is controlled by the wait and io_wref + * parameters. In detail: + * + * - If the caller passes a non-NULL io_wref and the buffer has an I/O wait + * reference, the *io_wref is set to the buffer's io_wref and + * BUFFER_IO_IN_PROGRESS is returned. This is done regardless of the wait + * parameter. * - * Input operations are only attempted on buffers that are not BM_VALID, - * and output operations only on buffers that are BM_VALID and BM_DIRTY, - * so we can always tell if the work is already done. + * - If the caller passes a NULL io_wref (i.e. the caller does not want to + * asynchronously wait for the completion of the IO), wait = false and the + * buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned. * - * Returns true if we successfully marked the buffer as I/O busy, - * false if someone else already did the work. + * - If wait = true and either the buffer does not have a wait reference, + * or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO + * to complete. To avoid the potential of deadlocks and unnecessary delays, + * all staged I/O is submitted before waiting. * - * If nowait is true, then we don't wait for an I/O to be finished by another - * backend. In that case, false indicates either that the I/O was already - * finished, or is still in progress. This is useful for callers that want to - * find out if they can perform the I/O as part of a larger operation, without - * waiting for the answer or distinguishing the reasons why not. + * Input operations are only attempted on buffers that are not BM_VALID, and + * output operations only on buffers that are BM_VALID and BM_DIRTY, so we can + * always tell if the work is already done. If no I/O is necessary, + * BUFFER_IO_ALREADY_DONE is returned. + * + * If we successfully marked the buffer as BM_IO_IN_PROGRESS, + * BUFFER_IO_READY_FOR_IO is returned. */ -bool -StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) +StartBufferIOResult +StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref) { uint64 buf_state; @@ -7159,10 +7169,42 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) if (!(buf_state & BM_IO_IN_PROGRESS)) break; - UnlockBufHdr(buf); - if (nowait) - return false; - WaitIO(buf); + + /* Join the existing IO */ + if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref)) + { + *io_wref = buf->io_wref; + UnlockBufHdr(buf); + + return BUFFER_IO_IN_PROGRESS; + } + else if (!wait) + { + UnlockBufHdr(buf); + return BUFFER_IO_IN_PROGRESS; + } + else + { + /* + * With wait = true, we always have to wait if the caller has + * passed io_wref = NULL. + * + * Even with io_wref != NULL, we have to wait if the buffer's wait + * ref is not valid but the IO is in progress, someone else + * started IO but hasn't set the wait ref yet. We have no choice + * but to wait until the IO completes. + */ + UnlockBufHdr(buf); + + /* + * If this backend currently has staged IO, submit it before + * waiting for in-progress IO, to avoid potential deadlocks and + * unnecessary delays. + */ + pgaio_submit_staged(); + + WaitIO(buf); + } } /* Once we get here, there is definitely no I/O active on this buffer */ @@ -7171,9 +7213,14 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { UnlockBufHdr(buf); - return false; + return BUFFER_IO_ALREADY_DONE; } + /* + * No IO in progress and not already done; we will start IO. It's possible + * that the IO was in progress but we're not done, because the IO errored + * out. We'll do the IO ourselves. + */ UnlockBufHdrExt(buf, buf_state, BM_IO_IN_PROGRESS, 0, 0); @@ -7181,7 +7228,31 @@ StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) ResourceOwnerRememberBufferIO(CurrentResourceOwner, BufferDescriptorGetBuffer(buf)); - return true; + return BUFFER_IO_READY_FOR_IO; +} + +/* + * Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used + * when the caller doesn't otherwise need to care about local vs shared. See + * StartSharedBufferIO() for details. + */ +StartBufferIOResult +StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref) +{ + BufferDesc *buf_hdr; + + if (BufferIsLocal(buffer)) + { + buf_hdr = GetLocalBufferDescriptor(-buffer - 1); + + return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref); + } + else + { + buf_hdr = GetBufferDescriptor(buffer - 1); + + return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref); + } } /* diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index b69398c6375..396da84b25c 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -189,9 +189,10 @@ FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln) /* * Try to start an I/O operation. There currently are no reasons for - * StartLocalBufferIO to return false, so we raise an error in that case. + * StartLocalBufferIO to return anything other than + * BUFFER_IO_READY_FOR_IO, so we raise an error in that case. */ - if (!StartLocalBufferIO(bufHdr, false, false)) + if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO) elog(ERROR, "failed to start write IO on local buffer"); /* Find smgr relation for buffer */ @@ -435,7 +436,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state); /* no need to loop for local buffers */ - StartLocalBufferIO(existing_hdr, true, false); + StartLocalBufferIO(existing_hdr, true, true, NULL); } else { @@ -451,7 +452,7 @@ ExtendBufferedRelLocal(BufferManagerRelation bmr, hresult->id = victim_buf_id; - StartLocalBufferIO(victim_buf_hdr, true, false); + StartLocalBufferIO(victim_buf_hdr, true, true, NULL); } } @@ -517,26 +518,41 @@ MarkLocalBufferDirty(Buffer buffer) } /* - * Like StartBufferIO, but for local buffers + * Like StartSharedBufferIO, but for local buffers */ -bool -StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait) +StartBufferIOResult +StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref) { uint64 buf_state; /* * With AIO the buffer could have IO in progress, e.g. when there are two - * scans of the same relation. Either wait for the other IO or return - * false. + * scans of the same relation. Either wait for the other IO (if wait = + * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS; */ if (pgaio_wref_valid(&bufHdr->io_wref)) { - PgAioWaitRef iow = bufHdr->io_wref; + PgAioWaitRef buf_wref = bufHdr->io_wref; - if (nowait) - return false; + if (io_wref != NULL) + { + /* We've already asynchronously started this IO, so join it */ + *io_wref = buf_wref; + return BUFFER_IO_IN_PROGRESS; + } - pgaio_wref_wait(&iow); + /* + * For temp buffers we should never need to wait in + * StartLocalBufferIO() when called with io_wref == NULL while there + * are staged IOs, as it's not allowed to call code that is not aware + * of AIO while in batch mode. + */ + Assert(!pgaio_have_staged()); + + if (!wait) + return BUFFER_IO_IN_PROGRESS; + + pgaio_wref_wait(&buf_wref); } /* Once we get here, there is definitely no I/O active on this buffer */ @@ -545,14 +561,14 @@ StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait) buf_state = pg_atomic_read_u64(&bufHdr->state); if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { - return false; + return BUFFER_IO_ALREADY_DONE; } /* BM_IO_IN_PROGRESS isn't currently used for local buffers */ /* local buffers don't track IO using resowners */ - return true; + return BUFFER_IO_READY_FOR_IO; } /* diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index 8d1e16b5d51..ad1b7b2216a 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -554,8 +554,25 @@ extern void ScheduleBufferTagForWriteback(WritebackContext *wb_context, extern void TrackNewBufferPin(Buffer buf); -/* solely to make it easier to write tests */ -extern bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); +/* + * Return value for StartBufferIO / StartSharedBufferIO / StartLocalBufferIO. + * + * When preparing a buffer for I/O and setting BM_IO_IN_PROGRESS, the buffer + * may already have I/O in progress or the I/O may have been done by another + * backend. See the documentation of StartSharedBufferIO for more details. + */ +typedef enum StartBufferIOResult +{ + BUFFER_IO_ALREADY_DONE, + BUFFER_IO_IN_PROGRESS, + BUFFER_IO_READY_FOR_IO, +} StartBufferIOResult; + +/* the following are exposed to make it easier to write tests */ +extern StartBufferIOResult StartBufferIO(Buffer buffer, bool forInput, bool wait, + PgAioWaitRef *io_wref); +extern StartBufferIOResult StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, + PgAioWaitRef *io_wref); extern void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint64 set_flag_bits, bool forget_owner, bool release_aio); @@ -600,7 +617,8 @@ extern BlockNumber ExtendBufferedRelLocal(BufferManagerRelation bmr, extern void MarkLocalBufferDirty(Buffer buffer); extern void TerminateLocalBufferIO(BufferDesc *bufHdr, bool clear_dirty, uint64 set_flag_bits, bool release_aio); -extern bool StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait); +extern StartBufferIOResult StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, + bool wait, PgAioWaitRef *io_wref); extern void FlushLocalBuffer(BufferDesc *bufHdr, SMgrRelation reln); extern void InvalidateLocalBuffer(BufferDesc *bufHdr, bool check_unreferenced); extern void DropRelationLocalBuffers(RelFileLocator rlocator, diff --git a/src/test/modules/test_aio/t/001_aio.pl b/src/test/modules/test_aio/t/001_aio.pl index 0317c991f9f..1f9e83690f4 100644 --- a/src/test/modules/test_aio/t/001_aio.pl +++ b/src/test/modules/test_aio/t/001_aio.pl @@ -383,7 +383,7 @@ sub test_startwait_io $io_method, $psql_a, "first StartBufferIO", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), qr/^t$/, qr/^$/); @@ -392,14 +392,14 @@ sub test_startwait_io $io_method, $psql_a, "second StartBufferIO fails, same session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^f$/, qr/^$/); psql_like( $io_method, $psql_b, "second StartBufferIO fails, other session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^f$/, qr/^$/); @@ -409,7 +409,7 @@ sub test_startwait_io $node, $psql_b, "blocking start buffer io", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), "BufferIo"); # Terminate the IO, without marking it as success, this should trigger the @@ -438,7 +438,7 @@ sub test_startwait_io $io_method, $psql_a, "blocking buffer io w/ success: first start buffer io", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), qr/^t$/, qr/^$/); @@ -448,7 +448,7 @@ sub test_startwait_io $node, $psql_b, "blocking start buffer io", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), "BufferIo"); # Terminate the IO, marking it as success @@ -486,7 +486,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "first StartLocalBufferIO", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^t$/, qr/^$/); @@ -497,7 +497,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "second StartLocalBufferIO succeeds, same session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^t$/, qr/^$/); @@ -509,7 +509,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "StartLocalBufferIO after not marking valid succeeds, same session", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);), qr/^t$/, qr/^$/); @@ -524,7 +524,7 @@ INSERT INTO tmp_ok SELECT generate_series(1, 10000); $io_method, $psql_a, "StartLocalBufferIO after marking valid fails", - qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);), + qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);), qr/^f$/, qr/^$/); @@ -1612,7 +1612,7 @@ read_buffers('$table', 1, 3)|, my $buf_id = $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|); $psql_b->query_safe( - qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)| + qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)| ); query_wait_block( @@ -1637,7 +1637,7 @@ read_buffers('$table', 1, 3)|, $buf_id = $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|); $psql_b->query_safe( - qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)| + qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)| ); query_wait_block( diff --git a/src/test/modules/test_aio/test_aio--1.0.sql b/src/test/modules/test_aio/test_aio--1.0.sql index 4a5a379b3c5..53c83b74e53 100644 --- a/src/test/modules/test_aio/test_aio--1.0.sql +++ b/src/test/modules/test_aio/test_aio--1.0.sql @@ -45,7 +45,7 @@ CREATE FUNCTION buffer_create_toy(rel regclass, blockno int4) RETURNS pg_catalog.int4 STRICT AS 'MODULE_PATHNAME' LANGUAGE C; -CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, nowait bool) +CREATE FUNCTION buffer_call_start_io(buffer int, for_input bool, wait bool) RETURNS pg_catalog.bool STRICT AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_aio/test_aio.c b/src/test/modules/test_aio/test_aio.c index eeeab9054c8..a0ad2313661 100644 --- a/src/test/modules/test_aio/test_aio.c +++ b/src/test/modules/test_aio/test_aio.c @@ -434,13 +434,13 @@ read_rel_block_ll(PG_FUNCTION_ARGS) if (RelationUsesLocalBuffers(rel)) { for (int i = 0; i < nblocks; i++) - StartLocalBufferIO(buf_hdrs[i], true, false); + StartLocalBufferIO(buf_hdrs[i], true, true, NULL); pgaio_io_set_flag(ioh, PGAIO_HF_REFERENCES_LOCAL); } else { for (int i = 0; i < nblocks; i++) - StartBufferIO(buf_hdrs[i], true, false); + StartSharedBufferIO(buf_hdrs[i], true, true, NULL); } pgaio_io_set_handle_data_32(ioh, (uint32 *) bufs, nblocks); @@ -622,15 +622,18 @@ buffer_call_start_io(PG_FUNCTION_ARGS) { Buffer buf = PG_GETARG_INT32(0); bool for_input = PG_GETARG_BOOL(1); - bool nowait = PG_GETARG_BOOL(2); + bool wait = PG_GETARG_BOOL(2); + StartBufferIOResult result; bool can_start; if (BufferIsLocal(buf)) - can_start = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1), - for_input, nowait); + result = StartLocalBufferIO(GetLocalBufferDescriptor(-buf - 1), + for_input, wait, NULL); else - can_start = StartBufferIO(GetBufferDescriptor(buf - 1), - for_input, nowait); + result = StartSharedBufferIO(GetBufferDescriptor(buf - 1), + for_input, wait, NULL); + + can_start = result == BUFFER_IO_READY_FOR_IO; /* * For tests we don't want the resowner release preventing us from diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b2c7c9e6f7c..e3c1007abdf 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2946,6 +2946,7 @@ SplitTextOutputData SplitVar StackElem StakindFlags +StartBufferIOResult StartDataPtrType StartLOPtrType StartLOsPtrType