]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
read stream: Split decision about look ahead for AIO and combining
authorAndres Freund <andres@anarazel.de>
Sun, 5 Apr 2026 04:43:54 +0000 (00:43 -0400)
committerAndres Freund <andres@anarazel.de>
Sun, 5 Apr 2026 04:43:54 +0000 (00:43 -0400)
In a subsequent commit the read-ahead distance will only be increased when
waiting for IO. Without further work that would cause a regression: As IO
combining and read-ahead are currently controlled by the same mechanism, we
would end up not allowing IO combining when never needing to wait for IO (as
the distance ends up too small to allow for full sized IOs), which can
increase CPU overhead. A typical reason to not have to wait for IO completion
at a low look-ahead distance is use of io_uring with the to-be-read data in
the page cache. But even with worker the IO submission rate may be low enough
for the worker to keep up.

One might think that we could just always perform IO combining, but doing so
at the start of a scan can cause performance regressions:

1) Performing a large IO commonly has a higher latency than smaller IOs. That
   is not a problem once reading ahead far enough, but at the start of a stream
   it can lead to longer waits for IO completion.

2) Sometimes read streams will not be read to completion. Immediately starting
   with full sized IOs leads to more wasted effort. This is not commonly an
   issue with existing read stream users, but the upcoming use of read streams
   to fetch table pages as part of an index scan frequently encounters this.

Solve this issue by splitting ReadStream->distance into ->combine_distance and
->readahead_distance. Right now they are increased/decreased at the same time,
but that will change in the next commit.

One of the comments in read_stream_should_look_ahead() refers to a motivation
that only really exists as of the next commit, but without it the code doesn't
make sense on its own.

Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Discussion: https://postgr.es/m/f3xxfrkafjxpyqxywcxricxgyizjirfceychyxsgn7bwjp5eda@kwbduhy7tfmu
Discussion: https://postgr.es/m/CA+hUKGL2PhFyDoqrHefqasOnaXhSg48t1phs3VM8BAdrZqKZkw@mail.gmail.com

src/backend/storage/aio/read_stream.c

index 4a7a271c3e68095fb6ee9e8d36b5359aa72e2a7b..37c3921450b5f66684f94be49b5b96bc0e579325 100644 (file)
@@ -98,10 +98,23 @@ struct ReadStream
        int16           max_pinned_buffers;
        int16           forwarded_buffers;
        int16           pinned_buffers;
-       int16           distance;
+
+       /*
+        * Limit of how far, in blocks, to look-ahead for IO combining and for
+        * read-ahead.
+        *
+        * The limits for read-ahead and combining are handled separately to allow
+        * for IO combining even in cases where the I/O subsystem can keep up at a
+        * low read-ahead distance, as doing larger IOs is more efficient.
+        *
+        * Set to 0 when the end of the stream is reached.
+        */
+       int16           combine_distance;
+       int16           readahead_distance;
        uint16          distance_decay_holdoff;
        int16           initialized_buffers;
-       int16           resume_distance;
+       int16           resume_readahead_distance;
+       int16           resume_combine_distance;
        int                     read_buffers_flags;
        bool            sync_mode;              /* using io_method=sync */
        bool            batch_mode;             /* READ_STREAM_USE_BATCHING */
@@ -332,8 +345,8 @@ read_stream_start_pending_read(ReadStream *stream)
 
                /* Shrink distance: no more look-ahead until buffers are released. */
                new_distance = stream->pinned_buffers + buffer_limit;
-               if (stream->distance > new_distance)
-                       stream->distance = new_distance;
+               if (stream->readahead_distance > new_distance)
+                       stream->readahead_distance = new_distance;
 
                /* Unless we have nothing to give the consumer, stop here. */
                if (stream->pinned_buffers > 0)
@@ -374,12 +387,29 @@ read_stream_start_pending_read(ReadStream *stream)
                 * perform IO asynchronously when starting out with a small look-ahead
                 * distance.
                 */
-               if (stream->distance > 1 && stream->ios_in_progress == 0)
+               if (stream->ios_in_progress == 0)
                {
-                       if (stream->distance_decay_holdoff == 0)
-                               stream->distance--;
-                       else
+                       if (stream->distance_decay_holdoff > 0)
                                stream->distance_decay_holdoff--;
+                       else
+                       {
+                               if (stream->readahead_distance > 1)
+                                       stream->readahead_distance--;
+
+                               /*
+                                * For now we reduce the IO combine distance after
+                                * sufficiently many buffer hits. There is no clear
+                                * performance argument for doing so, but at the moment we
+                                * need to do so to make the entrance into fast_path work
+                                * correctly: We require combine_distance == 1 to enter
+                                * fast-path, as without that condition we would wrongly
+                                * re-enter fast-path when readahead_distance == 1 and
+                                * pinned_buffers == 1, as we would not yet have prepared
+                                * another IO in that situation.
+                                */
+                               if (stream->combine_distance > 1)
+                                       stream->combine_distance--;
+                       }
                }
        }
        else
@@ -448,20 +478,56 @@ static inline bool
 read_stream_should_look_ahead(ReadStream *stream)
 {
        /* If the callback has signaled end-of-stream, we're done */
-       if (stream->distance == 0)
+       if (stream->readahead_distance == 0)
                return false;
 
        /* never start more IOs than our cap */
        if (stream->ios_in_progress >= stream->max_ios)
                return false;
 
+       /*
+        * Allow looking further ahead if we are in the process of building a
+        * larger IO, the IO is not yet big enough, and we don't yet have IO in
+        * flight.
+        *
+        * We do so to allow building larger reads when readahead_distance is
+        * small (e.g. because the I/O subsystem is keeping up or
+        * effective_io_concurrency is small). That's a useful goal because larger
+        * reads are more CPU efficient than smaller reads, even if the system is
+        * not IO bound.
+        *
+        * The reason we do *not* do so when we already have a read prepared (i.e.
+        * why we check for pinned_buffers == 0) is once we are actually reading
+        * ahead, we don't need it:
+        *
+        * - We won't issue unnecessarily small reads as
+        * read_stream_should_issue_now() will return false until the IO is
+        * suitably sized. The issuance of the pending read will be delayed until
+        * enough buffers have been consumed.
+        *
+        * - If we are not reading ahead aggressively enough, future
+        * WaitReadBuffers() calls will return true, leading to readahead_distance
+        * being increased. After that more full-sized IOs can be issued.
+        *
+        * Furthermore, if we did not have the pinned_buffers == 0 condition, we
+        * might end up issuing I/O more aggressively than we need.
+        *
+        * Note that a return of true here can lead to exceeding the read-ahead
+        * limit, but we won't exceed the buffer pin limit (because pinned_buffers
+        * == 0 and combine_distance is capped by max_pinned_buffers).
+        */
+       if (stream->pending_read_nblocks > 0 &&
+               stream->pinned_buffers == 0 &&
+               stream->pending_read_nblocks < stream->combine_distance)
+               return true;
+
        /*
         * Don't start more read-ahead if that'd put us over the distance limit
-        * for doing read-ahead. As stream->distance is capped by
+        * for doing read-ahead. As stream->readahead_distance is capped by
         * max_pinned_buffers, this prevents us from looking ahead so far that it
         * would put us over the pin limit.
         */
-       if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->distance)
+       if (stream->pinned_buffers + stream->pending_read_nblocks >= stream->readahead_distance)
                return false;
 
        return true;
@@ -490,14 +556,14 @@ read_stream_should_issue_now(ReadStream *stream)
         * If the callback has signaled end-of-stream, start the pending read
         * immediately. There is no further potential for IO combining.
         */
-       if (stream->distance == 0)
+       if (stream->readahead_distance == 0)
                return true;
 
        /*
-        * If we've already reached io_combine_limit, there's no chance of growing
+        * If we've already reached combine_distance, there's no chance of growing
         * the read further.
         */
-       if (pending_read_nblocks >= stream->io_combine_limit)
+       if (pending_read_nblocks >= stream->combine_distance)
                return true;
 
        /*
@@ -550,7 +616,8 @@ read_stream_look_ahead(ReadStream *stream)
                if (blocknum == InvalidBlockNumber)
                {
                        /* End of stream. */
-                       stream->distance = 0;
+                       stream->readahead_distance = 0;
+                       stream->combine_distance = 0;
                        break;
                }
 
@@ -597,7 +664,7 @@ read_stream_look_ahead(ReadStream *stream)
         * stream.  In the worst case we can always make progress one buffer at a
         * time.
         */
-       Assert(stream->pinned_buffers > 0 || stream->distance == 0);
+       Assert(stream->pinned_buffers > 0 || stream->readahead_distance == 0);
 
        if (stream->batch_mode)
                pgaio_exit_batchmode();
@@ -787,10 +854,17 @@ read_stream_begin_impl(int flags,
         * doing full io_combine_limit sized reads.
         */
        if (flags & READ_STREAM_FULL)
-               stream->distance = Min(max_pinned_buffers, stream->io_combine_limit);
+       {
+               stream->readahead_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+               stream->combine_distance = Min(max_pinned_buffers, stream->io_combine_limit);
+       }
        else
-               stream->distance = 1;
-       stream->resume_distance = stream->distance;
+       {
+               stream->readahead_distance = 1;
+               stream->combine_distance = 1;
+       }
+       stream->resume_readahead_distance = stream->readahead_distance;
+       stream->resume_combine_distance = stream->combine_distance;
 
        /*
         * Since we always access the same relation, we can initialize parts of
@@ -889,7 +963,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                Assert(stream->ios_in_progress == 0);
                Assert(stream->forwarded_buffers == 0);
                Assert(stream->pinned_buffers == 1);
-               Assert(stream->distance == 1);
+               Assert(stream->readahead_distance == 1);
+               Assert(stream->combine_distance == 1);
                Assert(stream->pending_read_nblocks == 0);
                Assert(stream->per_buffer_data_size == 0);
                Assert(stream->initialized_buffers > stream->oldest_buffer_index);
@@ -963,7 +1038,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                else
                {
                        /* No more blocks, end of stream. */
-                       stream->distance = 0;
+                       stream->readahead_distance = 0;
+                       stream->combine_distance = 0;
                        stream->oldest_buffer_index = stream->next_buffer_index;
                        stream->pinned_buffers = 0;
                        stream->buffers[oldest_buffer_index] = InvalidBuffer;
@@ -979,7 +1055,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                Assert(stream->oldest_buffer_index == stream->next_buffer_index);
 
                /* End of stream reached?  */
-               if (stream->distance == 0)
+               if (stream->readahead_distance == 0)
                        return InvalidBuffer;
 
                /*
@@ -993,7 +1069,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                /* End of stream reached? */
                if (stream->pinned_buffers == 0)
                {
-                       Assert(stream->distance == 0);
+                       Assert(stream->readahead_distance == 0);
                        return InvalidBuffer;
                }
        }
@@ -1014,7 +1090,10 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
        {
                int16           io_index = stream->oldest_io_index;
-               int32           distance;       /* wider temporary value, clamped below */
+
+               /* wider temporary values, clamped below */
+               int32           readahead_distance;
+               int32           combine_distance;
 
                /* Sanity check that we still agree on the buffers. */
                Assert(stream->ios[io_index].op.buffers ==
@@ -1027,10 +1106,18 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
                if (++stream->oldest_io_index == stream->max_ios)
                        stream->oldest_io_index = 0;
 
-               /* Look-ahead distance ramps up rapidly after we do I/O. */
-               distance = stream->distance * 2;
-               distance = Min(distance, stream->max_pinned_buffers);
-               stream->distance = distance;
+               /*
+                * Read-ahead and IO combining distances ramp up rapidly after we do
+                * I/O.
+                */
+               readahead_distance = stream->readahead_distance * 2;
+               readahead_distance = Min(readahead_distance, stream->max_pinned_buffers);
+               stream->readahead_distance = readahead_distance;
+
+               combine_distance = stream->combine_distance * 2;
+               combine_distance = Min(combine_distance, stream->io_combine_limit);
+               combine_distance = Min(combine_distance, stream->max_pinned_buffers);
+               stream->combine_distance = combine_distance;
 
                /*
                 * As we needed IO, prevent distance from being reduced within our
@@ -1111,7 +1198,8 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
        if (stream->ios_in_progress == 0 &&
                stream->forwarded_buffers == 0 &&
                stream->pinned_buffers == 1 &&
-               stream->distance == 1 &&
+               stream->readahead_distance == 1 &&
+               stream->combine_distance == 1 &&
                stream->pending_read_nblocks == 0 &&
                stream->per_buffer_data_size == 0)
        {
@@ -1157,8 +1245,10 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
 BlockNumber
 read_stream_pause(ReadStream *stream)
 {
-       stream->resume_distance = stream->distance;
-       stream->distance = 0;
+       stream->resume_readahead_distance = stream->readahead_distance;
+       stream->resume_combine_distance = stream->combine_distance;
+       stream->readahead_distance = 0;
+       stream->combine_distance = 0;
        return InvalidBlockNumber;
 }
 
@@ -1170,7 +1260,8 @@ read_stream_pause(ReadStream *stream)
 void
 read_stream_resume(ReadStream *stream)
 {
-       stream->distance = stream->resume_distance;
+       stream->readahead_distance = stream->resume_readahead_distance;
+       stream->combine_distance = stream->resume_combine_distance;
 }
 
 /*
@@ -1186,7 +1277,8 @@ read_stream_reset(ReadStream *stream)
        Buffer          buffer;
 
        /* Stop looking ahead. */
-       stream->distance = 0;
+       stream->readahead_distance = 0;
+       stream->combine_distance = 0;
 
        /* Forget buffered block number and fast path state. */
        stream->buffered_blocknum = InvalidBlockNumber;
@@ -1218,8 +1310,10 @@ read_stream_reset(ReadStream *stream)
        Assert(stream->ios_in_progress == 0);
 
        /* Start off assuming data is cached. */
-       stream->distance = 1;
-       stream->resume_distance = stream->distance;
+       stream->readahead_distance = 1;
+       stream->combine_distance = 1;
+       stream->resume_readahead_distance = stream->readahead_distance;
+       stream->resume_combine_distance = stream->combine_distance;
        stream->distance_decay_holdoff = 0;
 }