]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
test_aio: Add read_stream test infrastructure & tests
authorAndres Freund <andres@anarazel.de>
Fri, 27 Mar 2026 22:47:04 +0000 (18:47 -0400)
committerAndres Freund <andres@anarazel.de>
Fri, 27 Mar 2026 22:52:43 +0000 (18:52 -0400)
While we have a lot of indirect coverage of read streams, there are corner
cases that are hard to test when only indirectly controlling and observing the
read stream.  This commit adds an SQL callable SRF interface for a read stream
and uses that in a few tests.

To make some of the tests possible, the injection point infrastructure in
test_aio had to be expanded to allow blocking IO completion.

While at it, fix a wrong debug message in inj_io_short_read_hook().

Author: Andres Freund <andres@anarazel.de>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Discussion: https://postgr.es/m/zljergweqti7x67lg5ije2rzjusie37nslsnkjkkby4laqqbfw@3p3zu522yykv

src/test/modules/test_aio/meson.build
src/test/modules/test_aio/t/004_read_stream.pl [new file with mode: 0644]
src/test/modules/test_aio/test_aio--1.0.sql
src/test/modules/test_aio/test_aio.c
src/tools/pgindent/typedefs.list

index 18a797f3a3b5d20135ff429b4c123702e72e052c..909f81d96c14de8c7c095bb742699bd456e3e260 100644 (file)
@@ -33,6 +33,7 @@ tests += {
       't/001_aio.pl',
       't/002_io_workers.pl',
       't/003_initdb.pl',
+      't/004_read_stream.pl',
     ],
   },
 }
diff --git a/src/test/modules/test_aio/t/004_read_stream.pl b/src/test/modules/test_aio/t/004_read_stream.pl
new file mode 100644 (file)
index 0000000..32311c0
--- /dev/null
@@ -0,0 +1,278 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+use FindBin;
+use lib $FindBin::RealBin;
+
+use TestAio;
+
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init();
+
+TestAio::configure($node);
+
+$node->append_conf(
+       'postgresql.conf', qq(
+max_connections=8
+io_method=worker
+));
+
+$node->start();
+test_setup($node);
+$node->stop();
+
+
+foreach my $method (TestAio::supported_io_methods())
+{
+       $node->adjust_conf('postgresql.conf', 'io_method', $method);
+       $node->start();
+       test_io_method($method, $node);
+       $node->stop();
+}
+
+done_testing();
+
+
+sub test_setup
+{
+       my $node = shift;
+
+       $node->safe_psql(
+               'postgres', qq(
+CREATE EXTENSION test_aio;
+
+CREATE TABLE largeish(k int not null) WITH (FILLFACTOR=10);
+INSERT INTO largeish(k) SELECT generate_series(1, 10000);
+));
+       ok(1, "setup");
+}
+
+
+sub test_repeated_blocks
+{
+       my $io_method = shift;
+       my $node = shift;
+
+       my $psql = $node->background_psql('postgres', on_error_stop => 0);
+
+       # Preventing larger reads makes testing easier
+       $psql->query_safe(qq/SET io_combine_limit = 1/);
+
+       # test miss of the same block twice in a row
+       $psql->query_safe(qq/SELECT evict_rel('largeish');/);
+
+       # block 0 grows the distance enough that the stream will look ahead and try
+       # to start a pending read for block 2 (and later block 4) twice before
+       # returning any buffers.
+       $psql->query_safe(
+               qq/SELECT * FROM read_stream_for_blocks('largeish',
+                  ARRAY[0, 2, 2, 4, 4]);/);
+
+       ok(1, "$io_method: stream missing the same block repeatedly");
+
+       $psql->query_safe(
+               qq/SELECT * FROM read_stream_for_blocks('largeish',
+                  ARRAY[0, 2, 2, 4, 4]);/);
+       ok(1, "$io_method: stream hitting the same block repeatedly");
+
+       # test hit of the same block twice in a row
+       $psql->query_safe(qq/SELECT evict_rel('largeish');/);
+       $psql->query_safe(
+               qq/SELECT * FROM read_stream_for_blocks('largeish',
+                  ARRAY[0, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1, 0]);/);
+       ok(1, "$io_method: stream accessing same block");
+
+       # Test repeated blocks with a temp table, using invalidate_rel_block()
+       # to evict individual local buffers.
+       $psql->query_safe(
+               qq/CREATE TEMP TABLE largeish_temp(k int not null) WITH (FILLFACTOR=10);
+                  INSERT INTO largeish_temp(k) SELECT generate_series(1, 200);/);
+
+       # Evict the specific blocks we'll request to force misses
+       $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 0);/);
+       $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 2);/);
+       $psql->query_safe(qq/SELECT invalidate_rel_block('largeish_temp', 4);/);
+
+       $psql->query_safe(
+               qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+                  ARRAY[0, 2, 2, 4, 4]);/);
+       ok(1, "$io_method: temp stream missing the same block repeatedly");
+
+       # Now the blocks are cached, so repeated access should be hits
+       $psql->query_safe(
+               qq/SELECT * FROM read_stream_for_blocks('largeish_temp',
+                  ARRAY[0, 2, 2, 4, 4]);/);
+       ok(1, "$io_method: temp stream hitting the same block repeatedly");
+
+       $psql->quit();
+}
+
+
+sub test_inject_foreign
+{
+       my $io_method = shift;
+       my $node = shift;
+
+       my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+       my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+       my $pid_a = $psql_a->query_safe(qq/SELECT pg_backend_pid();/);
+
+
+       ###
+       # Test read stream encountering buffers undergoing IO in another backend,
+       # with the other backend's reads succeeding.
+       ###
+       $psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+       $psql_b->query_safe(
+               qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+                  relfilenode=>pg_relation_filenode('largeish'));/);
+
+       $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+               blockno=>5, nblocks=>1);\n/;
+       $psql_b->{run}->pump_nb();
+
+       $node->poll_query_until(
+               'postgres', qq/SELECT wait_event FROM pg_stat_activity
+                       WHERE wait_event = 'completion_wait';/,
+               'completion_wait');
+
+       # Block 5 is undergoing IO in session b, so session a will move on to start
+       # a new IO for block 7.
+       $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+               read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+       $psql_a->{run}->pump_nb();
+
+       $node->poll_query_until('postgres',
+               qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+               'AioIoCompletion');
+
+       $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+       pump_until(
+               $psql_a->{run}, $psql_a->{timeout},
+               \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+       $psql_a->{stdout} = '';
+
+       ok(1,
+               qq/$io_method: read stream encounters succeeding IO by another backend/
+       );
+
+       ###
+       # Test read stream encountering buffers undergoing IO in another backend,
+       # with the other backend's reads failing.
+       ###
+       $psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+       $psql_b->query_safe(
+               qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+                  relfilenode=>pg_relation_filenode('largeish'));/);
+
+       $psql_b->query_safe(
+               qq/SELECT inj_io_short_read_attach(-errno_from_string('EIO'),
+                  pid=>pg_backend_pid(),
+                  relfilenode=>pg_relation_filenode('largeish'));/);
+
+       $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+               blockno=>5, nblocks=>1);\n/;
+       $psql_b->{run}->pump_nb();
+
+       $node->poll_query_until(
+               'postgres',
+               qq/SELECT wait_event FROM pg_stat_activity
+                  WHERE wait_event = 'completion_wait';/,
+               'completion_wait');
+
+       $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+               read_stream_for_blocks('largeish', ARRAY[0, 2, 5, 7]);\n/;
+       $psql_a->{run}->pump_nb();
+
+       $node->poll_query_until('postgres',
+               qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+               'AioIoCompletion');
+
+       $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+       pump_until(
+               $psql_a->{run}, $psql_a->{timeout},
+               \$psql_a->{stdout}, qr/\{0,2,5,7\}/);
+       $psql_a->{stdout} = '';
+
+       pump_until($psql_b->{run}, $psql_b->{timeout}, \$psql_b->{stderr},
+               qr/ERROR.*could not read blocks 5\.\.5/);
+       ok(1, "$io_method: injected error occurred");
+       $psql_b->{stderr} = '';
+       $psql_b->query_safe(qq/SELECT inj_io_short_read_detach();/);
+
+       ok(1,
+               qq/$io_method: read stream encounters failing IO by another backend/);
+
+
+       ###
+       # Test read stream encountering two buffers that are undergoing the same
+       # IO, started by another backend.
+       ###
+       $psql_a->query_safe(qq/SELECT evict_rel('largeish');/);
+
+       $psql_b->query_safe(
+               qq/SELECT inj_io_completion_wait(pid=>pg_backend_pid(),
+                  relfilenode=>pg_relation_filenode('largeish'));/);
+
+       $psql_b->{stdin} .= qq/SELECT read_rel_block_ll('largeish',
+               blockno=>2, nblocks=>3);\n/;
+       $psql_b->{run}->pump_nb();
+
+       $node->poll_query_until(
+               'postgres',
+               qq/SELECT wait_event FROM pg_stat_activity
+                       WHERE wait_event = 'completion_wait';/,
+               'completion_wait');
+
+       # Blocks 2 and 4 are undergoing IO initiated by session b
+       $psql_a->{stdin} .= qq/SELECT array_agg(blocknum) FROM
+               read_stream_for_blocks('largeish', ARRAY[0, 2, 4]);\n/;
+       $psql_a->{run}->pump_nb();
+
+       $node->poll_query_until('postgres',
+               qq(SELECT wait_event FROM pg_stat_activity WHERE pid = $pid_a),
+               'AioIoCompletion');
+
+       $node->safe_psql('postgres', qq/SELECT inj_io_completion_continue()/);
+
+       pump_until(
+               $psql_a->{run}, $psql_a->{timeout},
+               \$psql_a->{stdout}, qr/\{0,2,4\}/);
+       $psql_a->{stdout} = '';
+
+       ok(1, qq/$io_method: read stream encounters two buffer read in one IO/);
+
+       $psql_a->quit();
+       $psql_b->quit();
+}
+
+
+sub test_io_method
+{
+       my $io_method = shift;
+       my $node = shift;
+
+       is($node->safe_psql('postgres', 'SHOW io_method'),
+               $io_method, "$io_method: io_method set correctly");
+
+       test_repeated_blocks($io_method, $node);
+
+  SKIP:
+       {
+               skip 'Injection points not supported by this build', 1
+                 unless $ENV{enable_injection_points} eq 'yes';
+               test_inject_foreign($io_method, $node);
+       }
+}
index 86beb563b6a1f1787a3f10037d5a04498356be98..4a5a379b3c5d52e1940978c59aa65dff40442777 100644 (file)
@@ -57,6 +57,13 @@ CREATE FUNCTION read_buffers(rel regclass, startblock int4, nblocks int4, OUT bl
 RETURNS SETOF record STRICT
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
+/*
+ * Read stream related functions
+ */
+CREATE FUNCTION read_stream_for_blocks(rel regclass, blocks int4[], OUT blockoff int4, OUT blocknum int4, OUT buf int4)
+RETURNS SETOF record STRICT
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
 
 /*
  * Handle related functions
@@ -98,8 +105,16 @@ AS 'MODULE_PATHNAME' LANGUAGE C;
 /*
  * Injection point related functions
  */
-CREATE FUNCTION inj_io_short_read_attach(result int)
-RETURNS pg_catalog.void STRICT
+CREATE FUNCTION inj_io_completion_wait(pid int DEFAULT NULL, relfilenode oid DEFAULT NULL, blockno int4 DEFAULT NULL)
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_completion_continue()
+RETURNS pg_catalog.void
+AS 'MODULE_PATHNAME' LANGUAGE C;
+
+CREATE FUNCTION inj_io_short_read_attach(result int, pid int DEFAULT NULL, relfilenode oid DEFAULT NULL)
+RETURNS pg_catalog.void
 AS 'MODULE_PATHNAME' LANGUAGE C;
 
 CREATE FUNCTION inj_io_short_read_detach()
index 7bcf4bfdf225855aac9f4dbad5dd31f149eded5b..eeeab9054c8432369336f65f2b4da5a8d3ae785e 100644 (file)
 #include "storage/buf_internals.h"
 #include "storage/bufmgr.h"
 #include "storage/checksum.h"
+#include "storage/condition_variable.h"
 #include "storage/ipc.h"
 #include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/procnumber.h"
+#include "storage/read_stream.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/rel.h"
 #include "utils/tuplestore.h"
+#include "utils/wait_event.h"
 
 
 PG_MODULE_MAGIC;
@@ -41,13 +46,31 @@ PG_MODULE_MAGIC;
 
 typedef struct InjIoErrorState
 {
+       ConditionVariable cv;
+
        bool            enabled_short_read;
        bool            enabled_reopen;
 
+       bool            enabled_completion_wait;
+       Oid                     completion_wait_relfilenode;
+       BlockNumber completion_wait_blockno;
+       pid_t           completion_wait_pid;
+       uint32          completion_wait_event;
+
        bool            short_read_result_set;
+       Oid                     short_read_relfilenode;
+       pid_t           short_read_pid;
        int                     short_read_result;
 } InjIoErrorState;
 
+typedef struct BlocksReadStreamData
+{
+       int                     nblocks;
+       int                     curblock;
+       uint32     *blocks;
+} BlocksReadStreamData;
+
+
 static InjIoErrorState *inj_io_error_state;
 
 /* Shared memory init callbacks */
@@ -88,11 +111,15 @@ test_aio_shmem_startup(void)
                /* First time through, initialize */
                inj_io_error_state->enabled_short_read = false;
                inj_io_error_state->enabled_reopen = false;
+               inj_io_error_state->enabled_completion_wait = false;
+
+               ConditionVariableInit(&inj_io_error_state->cv);
+               inj_io_error_state->completion_wait_event = WaitEventInjectionPointNew("completion_wait");
 
 #ifdef USE_INJECTION_POINTS
                InjectionPointAttach("aio-process-completion-before-shared",
                                                         "test_aio",
-                                                        "inj_io_short_read",
+                                                        "inj_io_completion_hook",
                                                         NULL,
                                                         0);
                InjectionPointLoad("aio-process-completion-before-shared");
@@ -386,7 +413,7 @@ read_rel_block_ll(PG_FUNCTION_ARGS)
        if (nblocks <= 0 || nblocks > PG_IOV_MAX)
                elog(ERROR, "nblocks is out of range");
 
-       rel = relation_open(relid, AccessExclusiveLock);
+       rel = relation_open(relid, AccessShareLock);
 
        for (int i = 0; i < nblocks; i++)
        {
@@ -816,6 +843,85 @@ read_buffers(PG_FUNCTION_ARGS)
 }
 
 
+static BlockNumber
+read_stream_for_blocks_cb(ReadStream *stream,
+                                                 void *callback_private_data,
+                                                 void *per_buffer_data)
+{
+       BlocksReadStreamData *stream_data = callback_private_data;
+
+       if (stream_data->curblock >= stream_data->nblocks)
+               return InvalidBlockNumber;
+       return stream_data->blocks[stream_data->curblock++];
+}
+
+PG_FUNCTION_INFO_V1(read_stream_for_blocks);
+Datum
+read_stream_for_blocks(PG_FUNCTION_ARGS)
+{
+       Oid                     relid = PG_GETARG_OID(0);
+       ArrayType  *blocksarray = PG_GETARG_ARRAYTYPE_P(1);
+       ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       Relation        rel;
+       BlocksReadStreamData stream_data;
+       ReadStream *stream;
+
+       InitMaterializedSRF(fcinfo, 0);
+
+       /*
+        * We expect the input to be an N-element int4 array; verify that. We
+        * don't need to use deconstruct_array() since the array data is just
+        * going to look like a C array of N int4 values.
+        */
+       if (ARR_NDIM(blocksarray) != 1 ||
+               ARR_HASNULL(blocksarray) ||
+               ARR_ELEMTYPE(blocksarray) != INT4OID)
+               elog(ERROR, "expected 1 dimensional int4 array");
+
+       stream_data.curblock = 0;
+       stream_data.nblocks = ARR_DIMS(blocksarray)[0];
+       stream_data.blocks = (uint32 *) ARR_DATA_PTR(blocksarray);
+
+       rel = relation_open(relid, AccessShareLock);
+
+       stream = read_stream_begin_relation(READ_STREAM_FULL,
+                                                                               NULL,
+                                                                               rel,
+                                                                               MAIN_FORKNUM,
+                                                                               read_stream_for_blocks_cb,
+                                                                               &stream_data,
+                                                                               0);
+
+       for (int i = 0; i < stream_data.nblocks; i++)
+       {
+               Buffer          buf = read_stream_next_buffer(stream, NULL);
+               Datum           values[3] = {0};
+               bool            nulls[3] = {0};
+
+               if (!BufferIsValid(buf))
+                       elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly invalid", i);
+
+               values[0] = Int32GetDatum(i);
+               values[1] = UInt32GetDatum(stream_data.blocks[i]);
+               values[2] = UInt32GetDatum(buf);
+
+               tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+               ReleaseBuffer(buf);
+       }
+
+       if (read_stream_next_buffer(stream, NULL) != InvalidBuffer)
+               elog(ERROR, "read_stream_next_buffer() call %d is unexpectedly valid",
+                        stream_data.nblocks);
+
+       read_stream_end(stream);
+
+       relation_close(rel, NoLock);
+
+       return (Datum) 0;
+}
+
+
 PG_FUNCTION_INFO_V1(handle_get);
 Datum
 handle_get(PG_FUNCTION_ARGS)
@@ -886,75 +992,169 @@ batch_end(PG_FUNCTION_ARGS)
 }
 
 #ifdef USE_INJECTION_POINTS
-extern PGDLLEXPORT void inj_io_short_read(const char *name,
-                                                                                 const void *private_data,
-                                                                                 void *arg);
+extern PGDLLEXPORT void inj_io_completion_hook(const char *name,
+                                                                                          const void *private_data,
+                                                                                          void *arg);
 extern PGDLLEXPORT void inj_io_reopen(const char *name,
                                                                          const void *private_data,
                                                                          void *arg);
 
-void
-inj_io_short_read(const char *name, const void *private_data, void *arg)
+static bool
+inj_io_short_read_matches(PgAioHandle *ioh)
+{
+       PGPROC     *io_proc;
+       int32           io_pid;
+       int32           inj_pid;
+       PgAioTargetData *td;
+
+       if (!inj_io_error_state->enabled_short_read)
+               return false;
+
+       if (!inj_io_error_state->short_read_result_set)
+               return false;
+
+       io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+       io_pid = io_proc->pid;
+       inj_pid = inj_io_error_state->short_read_pid;
+
+       if (inj_pid != InvalidPid && inj_pid != io_pid)
+               return false;
+
+       td = pgaio_io_get_target_data(ioh);
+
+       if (inj_io_error_state->short_read_relfilenode != InvalidOid &&
+               td->smgr.rlocator.relNumber != inj_io_error_state->short_read_relfilenode)
+               return false;
+
+       /*
+        * Only shorten reads that are actually longer than the target size,
+        * otherwise we can trigger over-reads.
+        */
+       if (inj_io_error_state->short_read_result >= ioh->result)
+               return false;
+
+       return true;
+}
+
+static bool
+inj_io_completion_wait_matches(PgAioHandle *ioh)
+{
+       PGPROC     *io_proc;
+       int32           io_pid;
+       PgAioTargetData *td;
+       int32           inj_pid;
+       BlockNumber io_blockno;
+       BlockNumber inj_blockno;
+       Oid                     inj_relfilenode;
+
+       if (!inj_io_error_state->enabled_completion_wait)
+               return false;
+
+       io_proc = GetPGProcByNumber(pgaio_io_get_owner(ioh));
+       io_pid = io_proc->pid;
+       inj_pid = inj_io_error_state->completion_wait_pid;
+
+       if (inj_pid != InvalidPid && inj_pid != io_pid)
+               return false;
+
+       td = pgaio_io_get_target_data(ioh);
+
+       inj_relfilenode = inj_io_error_state->completion_wait_relfilenode;
+       if (inj_relfilenode != InvalidOid &&
+               td->smgr.rlocator.relNumber != inj_relfilenode)
+               return false;
+
+       inj_blockno = inj_io_error_state->completion_wait_blockno;
+       io_blockno = td->smgr.blockNum;
+       if (inj_blockno != InvalidBlockNumber &&
+               !(inj_blockno >= io_blockno && inj_blockno < (io_blockno + td->smgr.nblocks)))
+               return false;
+
+       return true;
+}
+
+static void
+inj_io_completion_wait_hook(const char *name, const void *private_data, void *arg)
+{
+       PgAioHandle *ioh = (PgAioHandle *) arg;
+
+       if (!inj_io_completion_wait_matches(ioh))
+               return;
+
+       ConditionVariablePrepareToSleep(&inj_io_error_state->cv);
+
+       while (true)
+       {
+               if (!inj_io_completion_wait_matches(ioh))
+                       break;
+
+               ConditionVariableSleep(&inj_io_error_state->cv,
+                                                          inj_io_error_state->completion_wait_event);
+       }
+
+       ConditionVariableCancelSleep();
+}
+
+static void
+inj_io_short_read_hook(const char *name, const void *private_data, void *arg)
 {
        PgAioHandle *ioh = (PgAioHandle *) arg;
 
        ereport(LOG,
                        errmsg("short read injection point called, is enabled: %d",
-                                  inj_io_error_state->enabled_reopen),
+                                  inj_io_error_state->enabled_short_read),
                        errhidestmt(true), errhidecontext(true));
 
-       if (inj_io_error_state->enabled_short_read)
+       if (inj_io_short_read_matches(ioh))
        {
+               struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
+               int32           old_result = ioh->result;
+               int32           new_result = inj_io_error_state->short_read_result;
+               int32           processed = 0;
+
+               ereport(LOG,
+                               errmsg("short read inject point, changing result from %d to %d",
+                                          old_result, new_result),
+                               errhidestmt(true), errhidecontext(true));
+
                /*
-                * Only shorten reads that are actually longer than the target size,
-                * otherwise we can trigger over-reads.
+                * The underlying IO actually completed OK, and thus the "invalid"
+                * portion of the IOV actually contains valid data. That can hide a
+                * lot of problems, e.g. if we were to wrongly mark a buffer, that
+                * wasn't read according to the shortened-read, IO as valid, the
+                * contents would look valid and we might miss a bug.
+                *
+                * To avoid that, iterate through the IOV and zero out the "failed"
+                * portion of the IO.
                 */
-               if (inj_io_error_state->short_read_result_set
-                       && ioh->op == PGAIO_OP_READV
-                       && inj_io_error_state->short_read_result <= ioh->result)
+               for (int i = 0; i < ioh->op_data.read.iov_length; i++)
                {
-                       struct iovec *iov = &pgaio_ctl->iovecs[ioh->iovec_off];
-                       int32           old_result = ioh->result;
-                       int32           new_result = inj_io_error_state->short_read_result;
-                       int32           processed = 0;
-
-                       ereport(LOG,
-                                       errmsg("short read inject point, changing result from %d to %d",
-                                                  old_result, new_result),
-                                       errhidestmt(true), errhidecontext(true));
-
-                       /*
-                        * The underlying IO actually completed OK, and thus the "invalid"
-                        * portion of the IOV actually contains valid data. That can hide
-                        * a lot of problems, e.g. if we were to wrongly mark a buffer,
-                        * that wasn't read according to the shortened-read, IO as valid,
-                        * the contents would look valid and we might miss a bug.
-                        *
-                        * To avoid that, iterate through the IOV and zero out the
-                        * "failed" portion of the IO.
-                        */
-                       for (int i = 0; i < ioh->op_data.read.iov_length; i++)
+                       if (processed + iov[i].iov_len <= new_result)
+                               processed += iov[i].iov_len;
+                       else if (processed <= new_result)
                        {
-                               if (processed + iov[i].iov_len <= new_result)
-                                       processed += iov[i].iov_len;
-                               else if (processed <= new_result)
-                               {
-                                       uint32          ok_part = new_result - processed;
-
-                                       memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
-                                       processed += iov[i].iov_len;
-                               }
-                               else
-                               {
-                                       memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
-                               }
-                       }
+                               uint32          ok_part = new_result - processed;
 
-                       ioh->result = new_result;
+                               memset((char *) iov[i].iov_base + ok_part, 0, iov[i].iov_len - ok_part);
+                               processed += iov[i].iov_len;
+                       }
+                       else
+                       {
+                               memset((char *) iov[i].iov_base, 0, iov[i].iov_len);
+                       }
                }
+
+               ioh->result = new_result;
        }
 }
 
+void
+inj_io_completion_hook(const char *name, const void *private_data, void *arg)
+{
+       inj_io_completion_wait_hook(name, private_data, arg);
+       inj_io_short_read_hook(name, private_data, arg);
+}
+
 void
 inj_io_reopen(const char *name, const void *private_data, void *arg)
 {
@@ -968,6 +1168,42 @@ inj_io_reopen(const char *name, const void *private_data, void *arg)
 }
 #endif
 
+PG_FUNCTION_INFO_V1(inj_io_completion_wait);
+Datum
+inj_io_completion_wait(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+       inj_io_error_state->enabled_completion_wait = true;
+       inj_io_error_state->completion_wait_pid =
+               PG_ARGISNULL(0) ? InvalidPid : PG_GETARG_INT32(0);
+       inj_io_error_state->completion_wait_relfilenode =
+               PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
+       inj_io_error_state->completion_wait_blockno =
+               PG_ARGISNULL(2) ? InvalidBlockNumber : PG_GETARG_UINT32(2);
+#else
+       elog(ERROR, "injection points not supported");
+#endif
+
+       PG_RETURN_VOID();
+}
+
+PG_FUNCTION_INFO_V1(inj_io_completion_continue);
+Datum
+inj_io_completion_continue(PG_FUNCTION_ARGS)
+{
+#ifdef USE_INJECTION_POINTS
+       inj_io_error_state->enabled_completion_wait = false;
+       inj_io_error_state->completion_wait_pid = InvalidPid;
+       inj_io_error_state->completion_wait_relfilenode = InvalidOid;
+       inj_io_error_state->completion_wait_blockno = InvalidBlockNumber;
+       ConditionVariableBroadcast(&inj_io_error_state->cv);
+#else
+       elog(ERROR, "injection points not supported");
+#endif
+
+       PG_RETURN_VOID();
+}
+
 PG_FUNCTION_INFO_V1(inj_io_short_read_attach);
 Datum
 inj_io_short_read_attach(PG_FUNCTION_ARGS)
@@ -977,6 +1213,10 @@ inj_io_short_read_attach(PG_FUNCTION_ARGS)
        inj_io_error_state->short_read_result_set = !PG_ARGISNULL(0);
        if (inj_io_error_state->short_read_result_set)
                inj_io_error_state->short_read_result = PG_GETARG_INT32(0);
+       inj_io_error_state->short_read_pid =
+               PG_ARGISNULL(1) ? InvalidPid : PG_GETARG_INT32(1);
+       inj_io_error_state->short_read_relfilenode =
+               PG_ARGISNULL(2) ? InvalidOid : PG_GETARG_OID(2);
 #else
        elog(ERROR, "injection points not supported");
 #endif
index 112653c16803b3f18292d11c27aa6ab2c594acc6..b2c7c9e6f7c81c9610caf4b3ab15823706653b72 100644 (file)
@@ -309,6 +309,7 @@ BlockSampler
 BlockSamplerData
 BlockedProcData
 BlockedProcsData
+BlocksReadStreamData
 BlocktableEntry
 BloomBuildState
 BloomFilter