BufferDesc *bufHdr;
bool need_to_zero;
bool isLocalBuf = BufferIsLocal(buffer);
+ StartBufferIOResult sbres;
Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
*/
need_to_zero = false;
}
- else if (isLocalBuf)
- {
- /* Simple case for non-shared buffers. */
- bufHdr = GetLocalBufferDescriptor(-buffer - 1);
- need_to_zero = StartLocalBufferIO(bufHdr, true, false);
- }
else
{
- /*
- * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
- * concurrently. Even though we aren't doing I/O, that ensures that
- * we don't zero a page that someone else has pinned. An exclusive
- * content lock wouldn't be enough, because readers are allowed to
- * drop the content lock after determining that a tuple is visible
- * (see buffer access rules in README).
- */
- bufHdr = GetBufferDescriptor(buffer - 1);
- need_to_zero = StartBufferIO(bufHdr, true, false);
+ if (isLocalBuf)
+ {
+ /* Simple case for non-shared buffers. */
+ bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+ sbres = StartLocalBufferIO(bufHdr, true, true, NULL);
+ }
+ else
+ {
+ /*
+ * Take BM_IO_IN_PROGRESS, or discover that BM_VALID has been set
+ * concurrently. Even though we aren't doing I/O, that ensures
+ * that we don't zero a page that someone else has pinned. An
+ * exclusive content lock wouldn't be enough, because readers are
+ * allowed to drop the content lock after determining that a tuple
+ * is visible (see buffer access rules in README).
+ */
+ bufHdr = GetBufferDescriptor(buffer - 1);
+ sbres = StartSharedBufferIO(bufHdr, true, true, NULL);
+ }
+
+ Assert(sbres != BUFFER_IO_IN_PROGRESS);
+ need_to_zero = sbres == BUFFER_IO_READY_FOR_IO;
}
if (need_to_zero)
#endif
}
-/* helper for ReadBuffersCanStartIO(), to avoid repetition */
-static inline bool
-ReadBuffersCanStartIOOnce(Buffer buffer, bool nowait)
-{
- if (BufferIsLocal(buffer))
- return StartLocalBufferIO(GetLocalBufferDescriptor(-buffer - 1),
- true, nowait);
- else
- return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
-}
-
-/*
- * Helper for AsyncReadBuffers that tries to get the buffer ready for IO.
- */
-static inline bool
-ReadBuffersCanStartIO(Buffer buffer, bool nowait)
-{
- /*
- * If this backend currently has staged IO, we need to submit the pending
- * IO before waiting for the right to issue IO, to avoid the potential for
- * deadlocks (and, more commonly, unnecessary delays for other backends).
- */
- if (!nowait && pgaio_have_staged())
- {
- if (ReadBuffersCanStartIOOnce(buffer, true))
- return true;
-
- /*
- * Unfortunately StartBufferIO() returning false doesn't allow to
- * distinguish between the buffer already being valid and IO already
- * being in progress. Since IO already being in progress is quite
- * rare, this approach seems fine.
- */
- pgaio_submit_staged();
- }
-
- return ReadBuffersCanStartIOOnce(buffer, nowait);
-}
-
/*
* We track various stats related to buffer hits. Because this is done in a
* few separate places, this helper exists for convenience.
IOContext io_context;
IOObject io_object;
instr_time io_start;
+ StartBufferIOResult status;
if (persistence == RELPERSISTENCE_TEMP)
{
pgstat_prepare_report_checksum_failure(operation->smgr->smgr_rlocator.locator.dbOid);
/*
- * Get IO handle before ReadBuffersCanStartIO(), as pgaio_io_acquire()
- * might block, which we don't want after setting IO_IN_PROGRESS.
+ * Get IO handle before StartBufferIO(), as pgaio_io_acquire() might
+ * block, which we don't want after setting IO_IN_PROGRESS.
*
* If we need to wait for IO before we can get a handle, submit
* already-staged IO first, so that other backends don't need to wait.
* for the outcome: either done, or something went wrong and we will
* retry.
*/
- if (!ReadBuffersCanStartIO(buffers[nblocks_done], false))
+ status = StartBufferIO(buffers[nblocks_done], true, true, NULL);
+ if (status != BUFFER_IO_READY_FOR_IO)
{
- /*
- * Someone else has already completed this block, we're done.
- *
- * When IO is necessary, ->nblocks_done is updated in
- * ProcessReadBuffersResult(), but that is not called if no IO is
- * necessary. Thus update here.
- */
- operation->nblocks_done += 1;
+ pgaio_io_release(ioh);
*nblocks_progress = 1;
+ if (status == BUFFER_IO_ALREADY_DONE)
+ {
+ /*
+ * Someone has already completed this block, we're done.
+ *
+ * When IO is necessary, ->nblocks_done is updated in
+ * ProcessReadBuffersResult(), but that is not called if no IO is
+ * necessary. Thus update here.
+ */
+ operation->nblocks_done += 1;
+ Assert(operation->nblocks_done <= operation->nblocks);
- pgaio_io_release(ioh);
- pgaio_wref_clear(&operation->io_wref);
+ Assert(!pgaio_wref_valid(&operation->io_wref));
- /*
- * Report and track this as a 'hit' for this backend, even though it
- * must have started out as a miss in PinBufferForBlock(). The other
- * backend will track this as a 'read'.
- */
- TrackBufferHit(io_object, io_context,
- operation->rel, operation->persistence,
- operation->smgr, operation->forknum,
- blocknum);
- return false;
+ /*
+ * Report and track this as a 'hit' for this backend, even though
+ * it must have started out as a miss in PinBufferForBlock(). The
+ * other backend will track this as a 'read'.
+ */
+ TrackBufferHit(io_object, io_context,
+ operation->rel, operation->persistence,
+ operation->smgr, operation->forknum,
+ blocknum);
+ return false;
+ }
+
+ /* The IO is already in-progress */
+ Assert(status == BUFFER_IO_IN_PROGRESS);
+
+ return true;
}
Assert(io_buffers[0] == buffers[nblocks_done]);
/*
* NB: As little code as possible should be added between the
- * ReadBuffersCanStartIO() above, the further ReadBuffersCanStartIO()s
- * below and the smgrstartreadv(), as some of the buffers are now marked
- * as IO_IN_PROGRESS and will thus cause other backends to wait.
+ * StartBufferIO() above, the further StartBufferIO()s below and the
+ * smgrstartreadv(), as some of the buffers are now marked as
+ * IO_IN_PROGRESS and will thus cause other backends to wait.
*/
/*
Assert(BufferGetBlockNumber(buffers[i - 1]) ==
BufferGetBlockNumber(buffers[i]) - 1);
- if (!ReadBuffersCanStartIO(buffers[i], true))
+ status = StartBufferIO(buffers[i], true, false, NULL);
+ if (status != BUFFER_IO_READY_FOR_IO)
break;
Assert(io_buffers[io_buffers_len] == buffers[i]);
* We *must* do smgr[zero]extend before succeeding, else the page
* will not be reserved by the kernel, and the next P_NEW call
* will decide to return the same page. Clear the BM_VALID bit,
- * do StartBufferIO() and proceed.
+ * do StartSharedBufferIO() and proceed.
*
* Loop to handle the very small possibility that someone re-sets
- * BM_VALID between our clearing it and StartBufferIO inspecting
- * it.
+ * BM_VALID between our clearing it and StartSharedBufferIO
+ * inspecting it.
*/
- do
+ while (true)
{
+ StartBufferIOResult sbres;
+
pg_atomic_fetch_and_u64(&existing_hdr->state, ~BM_VALID);
- } while (!StartBufferIO(existing_hdr, true, false));
+
+ sbres = StartSharedBufferIO(existing_hdr, true, true, NULL);
+
+ if (sbres != BUFFER_IO_ALREADY_DONE)
+ break;
+ }
}
else
{
LWLockRelease(partition_lock);
/* XXX: could combine the locked operations in it with the above */
- StartBufferIO(victim_buf_hdr, true, false);
+ StartSharedBufferIO(victim_buf_hdr, true, true, NULL);
}
}
* someone else flushed the buffer before we could, so we need not do
* anything.
*/
- if (!StartBufferIO(buf, false, false))
+ if (StartSharedBufferIO(buf, false, true, NULL) == BUFFER_IO_ALREADY_DONE)
return;
/* Setup error traceback support for ereport() */
{
ConditionVariable *cv = BufferDescriptorGetIOCV(buf);
+ /*
+ * Should never end up here with unsubmitted IO, as no AIO unaware code
+ * may be used while in batch mode and AIO aware code needs to have
+ * submitted all staged IO to avoid deadlocks & slowness.
+ */
+ Assert(!pgaio_have_staged());
+
ConditionVariablePrepareToSleep(cv);
for (;;)
{
}
/*
- * StartBufferIO: begin I/O on this buffer
+ * StartSharedBufferIO: begin I/O on this buffer
* (Assumptions)
- * My process is executing no IO on this buffer
* The buffer is Pinned
*
- * In some scenarios multiple backends could attempt the same I/O operation
- * concurrently. If someone else has already started I/O on this buffer then
- * we will wait for completion of the IO using WaitIO().
+ * In several scenarios the buffer may already be undergoing I/O in this or
+ * another backend. How to best handle that depends on the caller's
+ * situation. It might be appropriate to wait synchronously (e.g., because the
+ * buffer is about to be invalidated); wait asynchronously, using the buffer's
+ * IO wait reference (e.g., because the caller is doing readahead and doesn't
+ * need the buffer to be ready immediately); or to not wait at all (e.g.,
+ * because the caller is trying to combine IO for this buffer with another
+ * buffer).
+ *
+ * How and whether to wait is controlled by the wait and io_wref
+ * parameters. In detail:
+ *
+ * - If the caller passes a non-NULL io_wref and the buffer has an I/O wait
+ * reference, the *io_wref is set to the buffer's io_wref and
+ * BUFFER_IO_IN_PROGRESS is returned. This is done regardless of the wait
+ * parameter.
*
- * Input operations are only attempted on buffers that are not BM_VALID,
- * and output operations only on buffers that are BM_VALID and BM_DIRTY,
- * so we can always tell if the work is already done.
+ * - If the caller passes a NULL io_wref (i.e. the caller does not want to
+ * asynchronously wait for the completion of the IO), wait = false and the
+ * buffer is undergoing IO, BUFFER_IO_IN_PROGRESS is returned.
*
- * Returns true if we successfully marked the buffer as I/O busy,
- * false if someone else already did the work.
+ * - If wait = true and either the buffer does not have a wait reference,
+ * or the caller passes io_wref = NULL, WaitIO() is used to wait for the IO
+ * to complete. To avoid the potential of deadlocks and unnecessary delays,
+ * all staged I/O is submitted before waiting.
*
- * If nowait is true, then we don't wait for an I/O to be finished by another
- * backend. In that case, false indicates either that the I/O was already
- * finished, or is still in progress. This is useful for callers that want to
- * find out if they can perform the I/O as part of a larger operation, without
- * waiting for the answer or distinguishing the reasons why not.
+ * Input operations are only attempted on buffers that are not BM_VALID, and
+ * output operations only on buffers that are BM_VALID and BM_DIRTY, so we can
+ * always tell if the work is already done. If no I/O is necessary,
+ * BUFFER_IO_ALREADY_DONE is returned.
+ *
+ * If we successfully marked the buffer as BM_IO_IN_PROGRESS,
+ * BUFFER_IO_READY_FOR_IO is returned.
*/
-bool
-StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
+StartBufferIOResult
+StartSharedBufferIO(BufferDesc *buf, bool forInput, bool wait, PgAioWaitRef *io_wref)
{
uint64 buf_state;
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
- UnlockBufHdr(buf);
- if (nowait)
- return false;
- WaitIO(buf);
+
+ /* Join the existing IO */
+ if (io_wref != NULL && pgaio_wref_valid(&buf->io_wref))
+ {
+ *io_wref = buf->io_wref;
+ UnlockBufHdr(buf);
+
+ return BUFFER_IO_IN_PROGRESS;
+ }
+ else if (!wait)
+ {
+ UnlockBufHdr(buf);
+ return BUFFER_IO_IN_PROGRESS;
+ }
+ else
+ {
+ /*
+ * With wait = true, we always have to wait if the caller has
+ * passed io_wref = NULL.
+ *
+ * Even with io_wref != NULL, we have to wait if the buffer's wait
+ * ref is not valid but the IO is in progress, someone else
+ * started IO but hasn't set the wait ref yet. We have no choice
+ * but to wait until the IO completes.
+ */
+ UnlockBufHdr(buf);
+
+ /*
+ * If this backend currently has staged IO, submit it before
+ * waiting for in-progress IO, to avoid potential deadlocks and
+ * unnecessary delays.
+ */
+ pgaio_submit_staged();
+
+ WaitIO(buf);
+ }
}
/* Once we get here, there is definitely no I/O active on this buffer */
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
UnlockBufHdr(buf);
- return false;
+ return BUFFER_IO_ALREADY_DONE;
}
+ /*
+ * No IO in progress and not already done; we will start IO. It's possible
+ * that the IO was in progress but we're not done, because the IO errored
+ * out. We'll do the IO ourselves.
+ */
UnlockBufHdrExt(buf, buf_state,
BM_IO_IN_PROGRESS, 0,
0);
ResourceOwnerRememberBufferIO(CurrentResourceOwner,
BufferDescriptorGetBuffer(buf));
- return true;
+ return BUFFER_IO_READY_FOR_IO;
+}
+
+/*
+ * Wrapper around StartSharedBufferIO / StartLocalBufferIO. Only to be used
+ * when the caller doesn't otherwise need to care about local vs shared. See
+ * StartSharedBufferIO() for details.
+ */
+StartBufferIOResult
+StartBufferIO(Buffer buffer, bool forInput, bool wait, PgAioWaitRef *io_wref)
+{
+ BufferDesc *buf_hdr;
+
+ if (BufferIsLocal(buffer))
+ {
+ buf_hdr = GetLocalBufferDescriptor(-buffer - 1);
+
+ return StartLocalBufferIO(buf_hdr, forInput, wait, io_wref);
+ }
+ else
+ {
+ buf_hdr = GetBufferDescriptor(buffer - 1);
+
+ return StartSharedBufferIO(buf_hdr, forInput, wait, io_wref);
+ }
}
/*
/*
* Try to start an I/O operation. There currently are no reasons for
- * StartLocalBufferIO to return false, so we raise an error in that case.
+ * StartLocalBufferIO to return anything other than
+ * BUFFER_IO_READY_FOR_IO, so we raise an error in that case.
*/
- if (!StartLocalBufferIO(bufHdr, false, false))
+ if (StartLocalBufferIO(bufHdr, false, true, NULL) != BUFFER_IO_READY_FOR_IO)
elog(ERROR, "failed to start write IO on local buffer");
/* Find smgr relation for buffer */
pg_atomic_unlocked_write_u64(&existing_hdr->state, buf_state);
/* no need to loop for local buffers */
- StartLocalBufferIO(existing_hdr, true, false);
+ StartLocalBufferIO(existing_hdr, true, true, NULL);
}
else
{
hresult->id = victim_buf_id;
- StartLocalBufferIO(victim_buf_hdr, true, false);
+ StartLocalBufferIO(victim_buf_hdr, true, true, NULL);
}
}
}
/*
- * Like StartBufferIO, but for local buffers
+ * Like StartSharedBufferIO, but for local buffers
*/
-bool
-StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool nowait)
+StartBufferIOResult
+StartLocalBufferIO(BufferDesc *bufHdr, bool forInput, bool wait, PgAioWaitRef *io_wref)
{
uint64 buf_state;
/*
* With AIO the buffer could have IO in progress, e.g. when there are two
- * scans of the same relation. Either wait for the other IO or return
- * false.
+ * scans of the same relation. Either wait for the other IO (if wait =
+ * true and io_wref == NULL) or return BUFFER_IO_IN_PROGRESS;
*/
if (pgaio_wref_valid(&bufHdr->io_wref))
{
- PgAioWaitRef iow = bufHdr->io_wref;
+ PgAioWaitRef buf_wref = bufHdr->io_wref;
- if (nowait)
- return false;
+ if (io_wref != NULL)
+ {
+ /* We've already asynchronously started this IO, so join it */
+ *io_wref = buf_wref;
+ return BUFFER_IO_IN_PROGRESS;
+ }
- pgaio_wref_wait(&iow);
+ /*
+ * For temp buffers we should never need to wait in
+ * StartLocalBufferIO() when called with io_wref == NULL while there
+ * are staged IOs, as it's not allowed to call code that is not aware
+ * of AIO while in batch mode.
+ */
+ Assert(!pgaio_have_staged());
+
+ if (!wait)
+ return BUFFER_IO_IN_PROGRESS;
+
+ pgaio_wref_wait(&buf_wref);
}
/* Once we get here, there is definitely no I/O active on this buffer */
buf_state = pg_atomic_read_u64(&bufHdr->state);
if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY))
{
- return false;
+ return BUFFER_IO_ALREADY_DONE;
}
/* BM_IO_IN_PROGRESS isn't currently used for local buffers */
/* local buffers don't track IO using resowners */
- return true;
+ return BUFFER_IO_READY_FOR_IO;
}
/*
$io_method,
$psql_a,
"first StartBufferIO",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
qr/^t$/,
qr/^$/);
$io_method,
$psql_a,
"second StartBufferIO fails, same session",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
qr/^f$/,
qr/^$/);
psql_like(
$io_method,
$psql_b,
"second StartBufferIO fails, other session",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
qr/^f$/,
qr/^$/);
$node,
$psql_b,
"blocking start buffer io",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
"BufferIo");
# Terminate the IO, without marking it as success, this should trigger the
$io_method,
$psql_a,
"blocking buffer io w/ success: first start buffer io",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
qr/^t$/,
qr/^$/);
$node,
$psql_b,
"blocking start buffer io",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
"BufferIo");
# Terminate the IO, marking it as success
$io_method,
$psql_a,
"first StartLocalBufferIO",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
qr/^t$/,
qr/^$/);
$io_method,
$psql_a,
"second StartLocalBufferIO succeeds, same session",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
qr/^t$/,
qr/^$/);
$io_method,
$psql_a,
"StartLocalBufferIO after not marking valid succeeds, same session",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>true);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>false);),
qr/^t$/,
qr/^$/);
$io_method,
$psql_a,
"StartLocalBufferIO after marking valid fails",
- qq(SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false);),
+ qq(SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true);),
qr/^f$/,
qr/^$/);
my $buf_id =
$psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
$psql_b->query_safe(
- qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+ qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
);
query_wait_block(
$buf_id =
$psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
$psql_b->query_safe(
- qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+ qq|SELECT buffer_call_start_io($buf_id, for_input=>true, wait=>true)|
);
query_wait_block(