}
+# Tests for StartReadBuffers()
+sub test_read_buffers
+{
+ my $io_method = shift;
+ my $node = shift;
+ my ($ret, $output);
+ my $table;
+
+ my $psql_a = $node->background_psql('postgres', on_error_stop => 0);
+ my $psql_b = $node->background_psql('postgres', on_error_stop => 0);
+
+ $psql_a->query_safe(
+ qq(
+CREATE TEMPORARY TABLE tmp_ok(data int not null);
+INSERT INTO tmp_ok SELECT generate_series(1, 5000);
+));
+
+ foreach my $persistency (qw(normal temporary))
+ {
+ $table = $persistency eq 'normal' ? 'tbl_ok' : 'tmp_ok';
+
+ # check that consecutive misses are combined into one read
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, combine, block 0-1",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 2)|,
+ qr/^0\|0\|t\|2$/,
+ qr/^$/);
+
+ # but if we do it again, i.e. it's in the buffer pool, there will be
+ # two operations
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, doesn't combine hits, block 0-1",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 2)|,
+ qr/^0\|0\|f\|1\n1\|1\|f\|1$/,
+ qr/^$/);
+
+ # Check that a larger read interrupted by a hit works
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, prep, block 3",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 3, 1)|,
+ qr/^0\|3\|t\|1$/,
+ qr/^$/);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, interrupted by hit on 3, block 2-5",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 2, 4)|,
+ qr/^0\|2\|t\|1\n1\|3\|f\|1\n2\|4\|t\|2$/,
+ qr/^$/);
+
+
+ # Verify that a read with an initial buffer hit works
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, miss, block 0",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 1)|,
+ qr/^0\|0\|t\|1$/,
+ qr/^$/);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, hit, block 0",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 1)|,
+ qr/^0\|0\|f\|1$/,
+ qr/^$/);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, miss, block 1",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 1)|,
+ qr/^0\|1\|t\|1$/,
+ qr/^$/);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, hit, block 1",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 1)|,
+ qr/^0\|1\|f\|1$/,
+ qr/^$/);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, hit, block 0-1",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 2)|,
+ qr/^0\|0\|f\|1\n1\|1\|f\|1$/,
+ qr/^$/);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, hit 0-1, miss 2",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 3)|,
+ qr/^0\|0\|f\|1\n1\|1\|f\|1\n2\|2\|t\|1$/,
+ qr/^$/);
+
+ # Verify that a read with an initial miss and trailing buffer hit(s) works
+ $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 0)|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, miss 0, hit 1-2",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 0, 3)|,
+ qr/^0\|0\|t\|1\n1\|1\|f\|1\n2\|2\|f\|1$/,
+ qr/^$/);
+ $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 1)|);
+ $psql_a->query_safe(qq|SELECT invalidate_rel_block('$table', 2)|);
+ $psql_a->query_safe(qq|SELECT * FROM read_buffers('$table', 3, 2)|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, miss 1-2, hit 3-4",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 4)|,
+ qr/^0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|f\|1$/,
+ qr/^$/);
+
+ # Verify that we aren't doing reads larger than
+ # io_combine_limit. That's just enforced in read_buffers() function,
+ # but kinda still worth testing.
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+ $psql_a->query_safe(qq|SET io_combine_limit=3|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, io_combine_limit has effect",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5)|,
+ qr/^0\|1\|t\|3\n3\|4\|t\|2$/,
+ qr/^$/);
+ $psql_a->query_safe(qq|RESET io_combine_limit|);
+
+
+ # Test encountering buffer IO we started in the first block of the
+ # range.
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+ $psql_a->query_safe(
+ qq|SELECT read_rel_block_ll('$table', 1, wait_complete=>false)|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, in-progress 1, read 1-3",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 3)|,
+ qr/^0\|1\|f\|1\n1\|2\|t\|2$/,
+ qr/^$/);
+
+ # Test in-progress IO in the middle block of the range
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+ $psql_a->query_safe(
+ qq|SELECT read_rel_block_ll('$table', 2, wait_complete=>false)|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, in-progress 2, read 1-3",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 3)|,
+ qr/^0\|1\|t\|1\n1\|2\|f\|1\n2\|3\|t\|1$/,
+ qr/^$/);
+
+ # Test in-progress IO on the last block of the range
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+ $psql_a->query_safe(
+ qq|SELECT read_rel_block_ll('$table', 3, wait_complete=>false)|);
+ psql_like(
+ $io_method,
+ $psql_a,
+ "$persistency: read buffers, in-progress 3, read 1-3",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM
+read_buffers('$table', 1, 3)|,
+ qr/^0\|1\|t\|2\n2\|3\|f\|1$/,
+ qr/^$/);
+ }
+
+ # The remaining tests don't make sense for temp tables, as they are
+ # concerned with multiple sessions interacting with each other.
+ $table = 'tbl_ok';
+ my $persistency = 'normal';
+
+ # Test start buffer IO will split IO if there's IO in progress. We can't
+ # observe this with sync, as that does not start the IO operation in
+ # StartReadBuffers().
+ if ($io_method ne 'sync')
+ {
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+
+ my $buf_id =
+ $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
+ $psql_b->query_safe(
+ qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+ );
+
+ query_wait_block(
+ $io_method,
+ $node,
+ $psql_a,
+ "$persistency: read buffers blocks waiting for concurrent IO",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5);\n|,
+ "BufferIo");
+ $psql_b->query_safe(
+ qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>false, io_error=>false, release_aio=>false)|
+ );
+ pump_until(
+ $psql_a->{run}, $psql_a->{timeout},
+ \$psql_a->{stdout}, qr/0\|1\|t\|2\n2\|3\|t\|3/);
+ ok(1,
+ "$io_method: $persistency: IO was split due to concurrent failed IO"
+ );
+
+ # Same as before, except the concurrent IO succeeds this time
+ $psql_a->query_safe(qq|SELECT evict_rel('$table')|);
+ $buf_id =
+ $psql_b->query_safe(qq|SELECT buffer_create_toy('$table', 3)|);
+ $psql_b->query_safe(
+ qq|SELECT buffer_call_start_io($buf_id, for_input=>true, nowait=>false)|
+ );
+
+ query_wait_block(
+ $io_method,
+ $node,
+ $psql_a,
+ "$persistency: read buffers blocks waiting for concurrent IO",
+ qq|SELECT blockoff, blocknum, io_reqd, nblocks FROM read_buffers('$table', 1, 5);\n|,
+ "BufferIo");
+ $psql_b->query_safe(
+ qq|SELECT buffer_call_terminate_io($buf_id, for_input=>true, succeed=>true, io_error=>false, release_aio=>false)|
+ );
+ pump_until($psql_a->{run}, $psql_a->{timeout}, \$psql_a->{stdout},
+ qr/0\|1\|t\|2\n2\|3\|f\|1\n3\|4\|t\|2/);
+ ok(1,
+ "$io_method: $persistency: IO was split due to concurrent successful IO"
+ );
+ }
+
+ $psql_a->quit();
+ $psql_b->quit();
+}
+
+
# Run all tests that for the specified node / io_method
sub test_io_method
{
test_checksum($io_method, $node);
test_ignore_checksum($io_method, $node);
test_checksum_createdb($io_method, $node);
+ test_read_buffers($io_method, $node);
# generic injection tests
SKIP:
#include "postgres.h"
#include "access/relation.h"
+#include "catalog/pg_type.h"
#include "fmgr.h"
+#include "funcapi.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/buf_internals.h"
#include "storage/checksum.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
+#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
+#include "utils/tuplestore.h"
PG_MODULE_MAGIC;
PG_RETURN_VOID();
}
-PG_FUNCTION_INFO_V1(invalidate_rel_block);
-Datum
-invalidate_rel_block(PG_FUNCTION_ARGS)
+/* helper for invalidate_rel_block() and evict_rel() */
+static void
+invalidate_one_block(Relation rel, ForkNumber forknum, BlockNumber blkno)
{
- Oid relid = PG_GETARG_OID(0);
- BlockNumber blkno = PG_GETARG_UINT32(1);
- Relation rel;
PrefetchBufferResult pr;
Buffer buf;
- rel = relation_open(relid, AccessExclusiveLock);
-
/*
* This is a gross hack, but there's no other API exposed that allows to
* get a buffer ID without actually reading the block in.
*/
- pr = PrefetchBuffer(rel, MAIN_FORKNUM, blkno);
+ pr = PrefetchBuffer(rel, forknum, blkno);
buf = pr.recent_buffer;
if (BufferIsValid(buf))
{
/* if the buffer contents aren't valid, this'll return false */
- if (ReadRecentBuffer(rel->rd_locator, MAIN_FORKNUM, blkno, buf))
+ if (ReadRecentBuffer(rel->rd_locator, forknum, blkno, buf))
{
BufferDesc *buf_hdr = BufferIsLocal(buf) ?
GetLocalBufferDescriptor(-buf - 1)
}
}
+}
+
+PG_FUNCTION_INFO_V1(invalidate_rel_block);
+Datum
+invalidate_rel_block(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ BlockNumber blkno = PG_GETARG_UINT32(1);
+ Relation rel;
+
+ rel = relation_open(relid, AccessExclusiveLock);
+
+ invalidate_one_block(rel, MAIN_FORKNUM, blkno);
+
relation_close(rel, AccessExclusiveLock);
PG_RETURN_VOID();
}
+PG_FUNCTION_INFO_V1(evict_rel);
+Datum
+evict_rel(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ Relation rel;
+
+ rel = relation_open(relid, AccessExclusiveLock);
+
+ /*
+ * EvictRelUnpinnedBuffers() doesn't support temp tables, so for temp
+ * tables we have to do it the expensive way and evict every possible
+ * buffer.
+ */
+ if (RelationUsesLocalBuffers(rel))
+ {
+ SMgrRelation smgr = RelationGetSmgr(rel);
+
+ for (int forknum = MAIN_FORKNUM; forknum <= MAX_FORKNUM; forknum++)
+ {
+ BlockNumber nblocks;
+
+ if (!smgrexists(smgr, forknum))
+ continue;
+
+ nblocks = smgrnblocks(smgr, forknum);
+
+ for (int blkno = 0; blkno < nblocks; blkno++)
+ {
+ invalidate_one_block(rel, forknum, blkno);
+ }
+ }
+ }
+ else
+ {
+ int32 buffers_evicted,
+ buffers_flushed,
+ buffers_skipped;
+
+ EvictRelUnpinnedBuffers(rel, &buffers_evicted, &buffers_flushed,
+ &buffers_skipped);
+ }
+
+ relation_close(rel, AccessExclusiveLock);
+
+
+ PG_RETURN_VOID();
+}
+
PG_FUNCTION_INFO_V1(buffer_create_toy);
Datum
buffer_create_toy(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
+PG_FUNCTION_INFO_V1(read_buffers);
+/*
+ * Infrastructure to test StartReadBuffers()
+ */
+Datum
+read_buffers(PG_FUNCTION_ARGS)
+{
+ Oid relid = PG_GETARG_OID(0);
+ BlockNumber startblock = PG_GETARG_UINT32(1);
+ int32 nblocks = PG_GETARG_INT32(2);
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Relation rel;
+ SMgrRelation smgr;
+ int nblocks_done = 0;
+ int nblocks_disp = 0;
+ int nios = 0;
+ ReadBuffersOperation *operations;
+ Buffer *buffers;
+ Datum *buffers_datum;
+ bool *io_reqds;
+
+ Assert(nblocks > 0);
+
+ InitMaterializedSRF(fcinfo, 0);
+
+ /* at worst each block gets its own IO */
+ operations = palloc0(sizeof(ReadBuffersOperation) * nblocks);
+ buffers = palloc0(sizeof(Buffer) * nblocks);
+ buffers_datum = palloc0(sizeof(Datum) * nblocks);
+ io_reqds = palloc0(sizeof(bool) * nblocks);
+
+ rel = relation_open(relid, AccessShareLock);
+ smgr = RelationGetSmgr(rel);
+
+ /*
+ * Do StartReadBuffers() until IO for all the required blocks has been
+ * started (if required).
+ */
+ while (nblocks_done < nblocks)
+ {
+ ReadBuffersOperation *operation = &operations[nios];
+ int nblocks_this_io =
+ Min(nblocks - nblocks_done, io_combine_limit);
+
+ operation->rel = rel;
+ operation->smgr = smgr;
+ operation->persistence = rel->rd_rel->relpersistence;
+ operation->strategy = NULL;
+ operation->forknum = MAIN_FORKNUM;
+
+ io_reqds[nios] = StartReadBuffers(operation,
+ &buffers[nblocks_done],
+ startblock + nblocks_done,
+ &nblocks_this_io,
+ 0);
+ nios++;
+ nblocks_done += nblocks_this_io;
+ }
+
+ /*
+ * Now wait for all operations that required IO. This is done at the end,
+ * as otherwise waiting for IO in progress in other backends could
+ * influence the result for subsequent buffers / blocks.
+ */
+ for (int nio = 0; nio < nios; nio++)
+ {
+ ReadBuffersOperation *operation = &operations[nio];
+
+ if (io_reqds[nio])
+ WaitReadBuffers(operation);
+ }
+
+ /*
+ * Convert what has been done into SQL SRF return value.
+ */
+ for (int nio = 0; nio < nios; nio++)
+ {
+ ReadBuffersOperation *operation = &operations[nio];
+ int nblocks_this_io = operation->nblocks;
+ Datum values[5] = {0};
+ bool nulls[5] = {0};
+ ArrayType *buffers_arr;
+
+ /* convert buffer array to datum array */
+ for (int i = 0; i < nblocks_this_io; i++)
+ {
+ Buffer buf = operation->buffers[i];
+
+ Assert(buffers[nblocks_disp + i] == buf);
+ Assert(BufferGetBlockNumber(buf) == startblock + nblocks_disp + i);
+
+ buffers_datum[nblocks_disp + i] = Int32GetDatum(buf);
+ }
+
+ buffers_arr = construct_array_builtin(&buffers_datum[nblocks_disp],
+ nblocks_this_io,
+ INT4OID);
+
+ /* blockoff */
+ values[0] = Int32GetDatum(nblocks_disp);
+ nulls[0] = false;
+
+ /* blocknum */
+ values[1] = UInt32GetDatum(startblock + nblocks_disp);
+ nulls[1] = false;
+
+ /* io_reqd */
+ values[2] = BoolGetDatum(io_reqds[nio]);
+ nulls[2] = false;
+
+ /* nblocks */
+ values[3] = Int32GetDatum(nblocks_this_io);
+ nulls[3] = false;
+
+ /* array of buffers */
+ values[4] = PointerGetDatum(buffers_arr);
+ nulls[4] = false;
+
+ tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
+
+ nblocks_disp += nblocks_this_io;
+ }
+
+ /* release pins on all the buffers */
+ for (int nio = 0; nio < nios; nio++)
+ {
+ ReadBuffersOperation *operation = &operations[nio];
+
+ for (int i = 0; i < operation->nblocks; i++)
+ ReleaseBuffer(operation->buffers[i]);
+ }
+
+ /*
+ * Free explicitly, to have a chance to detect potential issues with too
+ * long lived references to the operation.
+ */
+ pfree(operations);
+ pfree(buffers);
+ pfree(buffers_datum);
+ pfree(io_reqds);
+
+ relation_close(rel, NoLock);
+
+ return (Datum) 0;
+}
+
+
PG_FUNCTION_INFO_V1(handle_get);
Datum
handle_get(PG_FUNCTION_ARGS)