int16 max_pinned_buffers;
int16 forwarded_buffers;
int16 pinned_buffers;
- int16 distance;
+
+ /*
+ * Limit of how far, in blocks, to look-ahead for IO combining and for
+ * read-ahead.
+ *
+ * The limits for read-ahead and combining are handled separately to allow
+ * for IO combining even in cases where the I/O subsystem can keep up at a
+ * low read-ahead distance, as doing larger IOs is more efficient.
+ *
+ * Set to 0 when the end of the stream is reached.
+ */
+ int16 combine_distance;
+ int16 readahead_distance;
uint16 distance_decay_holdoff;
int16 initialized_buffers;
- int16 resume_distance;
+ int16 resume_readahead_distance;
+ int16 resume_combine_distance;
int read_buffers_flags;
bool sync_mode; /* using io_method=sync */
bool batch_mode; /* READ_STREAM_USE_BATCHING */
/* Shrink distance: no more look-ahead until buffers are released. */
new_distance = stream->pinned_buffers + buffer_limit;
- if (stream->distance > new_distance)
- stream->distance = new_distance;
+ if (stream->readahead_distance > new_distance)
+ stream->readahead_distance = new_distance;
/* Unless we have nothing to give the consumer, stop here. */
if (stream->pinned_buffers > 0)
* perform IO asynchronously when starting out with a small look-ahead
* distance.
*/
- if (stream->distance > 1 && stream->ios_in_progress == 0)
+ if (stream->ios_in_progress == 0)
{
- if (stream->distance_decay_holdoff == 0)
- stream->distance--;
- else
+ if (stream->distance_decay_holdoff > 0)
stream->distance_decay_holdoff--;
+ else
+ {
+ if (stream->readahead_distance > 1)
+ stream->readahead_distance--;
+
+ /*
+ * For now we reduce the IO combine distance after
+ * sufficiently many buffer hits. There is no clear
+ * performance argument for doing so, but at the moment we
+ * need to do so to make the entrance into fast_path work
+ * correctly: We require combine_distance == 1 to enter
+ * fast-path, as without that condition we would wrongly
+ * re-enter fast-path when readahead_distance == 1 and
+ * pinned_buffers == 1, as we would not yet have prepared
+ * another IO in that situation.
+ */
+ if (stream->combine_distance > 1)
+ stream->combine_distance--;
+ }
}
}
else
read_stream_should_look_ahead(ReadStream *stream)
{
/* If the callback has signaled end-of-stream, we're done */
- if (stream->distance == 0)
+ if (stream->readahead_distance == 0)
return false;
/* never start more IOs than our cap */
if (stream->ios_in_progress >= stream->max_ios)
return false;
+ /*
+ * Allow looking further ahead if we are in the process of building a
+ * larger IO, the IO is not yet big enough, and we don't yet have IO in
+ * flight.
+ *
+ * We do so to allow building larger reads when readahead_distance is
+ * small (e.g. because the I/O subsystem is keeping up or
+ * effective_io_concurrency is small). That's a useful goal because larger
+ * reads are more CPU efficient than smaller reads, even if the system is
+ * not IO bound.
+ *
+ * The reason we do *not* do so when we already have a read prepared (i.e.
+ * why we check for pinned_buffers == 0) is once we are actually reading
+ * ahead, we don't need it:
+ *
+ * - We won't issue unnecessarily small reads as
+ * read_stream_should_issue_now() will return false until the IO is
+ * suitably sized. The issuance of the pending read will be delayed until
+ * enough buffers have been consumed.
+ *
+ * - If we are not reading ahead aggressively enough, future
+ * WaitReadBuffers() calls will return true, leading to readahead_distance
+ * being increased. After that more full-sized IOs can be issued.
+ *
+ * Furthermore, if we did not have the pinned_buffers == 0 condition, we
+ * might end up issuing I/O more aggressively than we need.
+ *
+ * Note that a return of true here can lead to exceeding the read-ahead
+ * limit, but we won't exceed the buffer pin limit (because pinned_buffers
+ * == 0 and combine_distance is capped by max_pinned_buffers).
+ */
+ if (stream->pending_read_nblocks > 0 &&
+ stream->pinned_buffers == 0 &&
+ stream->pending_read_nblocks < stream->combine_distance)
+ return true;
+
/*
* Don't start more read-ahead if that'd put us over the distance limit
- * for doing read-ahead. As stream->distance is capped by
+ * for doing read-ahead. As stream->readahead_distance is capped by
* max_pinned_buffers, this prevents us from looking ahead so far that it
* would put us over the pin limit.
*/
- if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+ if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
return false;
return true;
* If the callback has signaled end-of-stream, start the pending read
* immediately. There is no further potential for IO combining.
*/
- if (stream->distance == 0)
+ if (stream->readahead_distance == 0)
return true;
/*
- * If we've already reached io_combine_limit, there's no chance of growing
+ * If we've already reached combine_distance, there's no chance of growing
* the read further.
*/
- if (pending_read_nblocks >= stream->io_combine_limit)
+ if (pending_read_nblocks >= stream->combine_distance)
return true;
/*
if (blocknum == InvalidBlockNumber)
{
/* End of stream. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
+ stream->combine_distance = 0;
break;
}
* stream. In the worst case we can always make progress one buffer at a
* time.
*/
- Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+ Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
if (stream->batch_mode)
pgaio_exit_batchmode();
* doing full io_combine_limit sized reads.
*/
if (flags & READ_STREAM_FULL)
- stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+ {
+ stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+ stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+ }
else
- stream->distance = 1;
- stream->resume_distance = stream->distance;
+ {
+ stream->readahead_distance = 1;
+ stream->combine_distance = 1;
+ }
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
/*
* Since we always access the same relation, we can initialize parts of
Assert(stream->ios_in_progress == 0);
Assert(stream->forwarded_buffers == 0);
Assert(stream->pinned_buffers == 1);
- Assert(stream->distance == 1);
+ Assert(stream->readahead_distance == 1);
+ Assert(stream->combine_distance == 1);
Assert(stream->pending_read_nblocks == 0);
Assert(stream->per_buffer_data_size == 0);
Assert(stream->initialized_buffers > stream->oldest_buffer_index);
else
{
/* No more blocks, end of stream. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
+ stream->combine_distance = 0;
stream->oldest_buffer_index = stream->next_buffer_index;
stream->pinned_buffers = 0;
stream->buffers[oldest_buffer_index] = InvalidBuffer;
Assert(stream->oldest_buffer_index == stream->next_buffer_index);
/* End of stream reached? */
- if (stream->distance == 0)
+ if (stream->readahead_distance == 0)
return InvalidBuffer;
/*
/* End of stream reached? */
if (stream->pinned_buffers == 0)
{
- Assert(stream->distance == 0);
+ Assert(stream->readahead_distance == 0);
return InvalidBuffer;
}
}
stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
{
int16 io_index = stream->oldest_io_index;
- int32 distance; /* wider temporary value, clamped below */
+
+ /* wider temporary values, clamped below */
+ int32 readahead_distance;
+ int32 combine_distance;
/* Sanity check that we still agree on the buffers. */
Assert(stream->ios[io_index].op.buffers ==
if (++stream->oldest_io_index == stream->max_ios)
stream->oldest_io_index = 0;
- /* Look-ahead distance ramps up rapidly after we do I/O. */
- distance = stream->distance * 2;
- distance = Min(distance, stream->max_pinned_buffers);
- stream->distance = distance;
+ /*
+ * Read-ahead and IO combining distances ramp up rapidly after we do
+ * I/O.
+ */
+ readahead_distance = stream->readahead_distance * 2;
+ readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+ stream->readahead_distance = readahead_distance;
+
+ combine_distance = stream->combine_distance * 2;
+ combine_distance = Min(combine_distance, stream->io_combine_limit);
+ combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+ stream->combine_distance = combine_distance;
/*
* As we needed IO, prevent distance from being reduced within our
if (stream->ios_in_progress == 0 &&
stream->forwarded_buffers == 0 &&
stream->pinned_buffers == 1 &&
- stream->distance == 1 &&
+ stream->readahead_distance == 1 &&
+ stream->combine_distance == 1 &&
stream->pending_read_nblocks == 0 &&
stream->per_buffer_data_size == 0)
{
BlockNumber
read_stream_pause(ReadStream *stream)
{
- stream->resume_distance = stream->distance;
- stream->distance = 0;
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
+ stream->readahead_distance = 0;
+ stream->combine_distance = 0;
return InvalidBlockNumber;
}
void
read_stream_resume(ReadStream *stream)
{
- stream->distance = stream->resume_distance;
+ stream->readahead_distance = stream->resume_readahead_distance;
+ stream->combine_distance = stream->resume_combine_distance;
}
/*
Buffer buffer;
/* Stop looking ahead. */
- stream->distance = 0;
+ stream->readahead_distance = 0;
+ stream->combine_distance = 0;
/* Forget buffered block number and fast path state. */
stream->buffered_blocknum = InvalidBlockNumber;
Assert(stream->ios_in_progress == 0);
/* Start off assuming data is cached. */
- stream->distance = 1;
- stream->resume_distance = stream->distance;
+ stream->readahead_distance = 1;
+ stream->combine_distance = 1;
+ stream->resume_readahead_distance = stream->readahead_distance;
+ stream->resume_combine_distance = stream->combine_distance;
stream->distance_decay_holdoff = 0;
}