#include "nodes/execnodes.h"
#include "optimizer/plancat.h"
#include "pgstat.h"
+#include "storage/read_stream.h"
#include "utils/fmgrprotos.h"
#include "utils/index_selfuncs.h"
#include "utils/rel.h"
Relation heapRel; /* heap relation descriptor */
} HashBuildState;
+/* Working state for streaming reads in hashbulkdelete */
+typedef struct
+{
+ HashMetaPage metap; /* cached metapage for BUCKET_TO_BLKNO */
+ Bucket next_bucket; /* next bucket to prefetch */
+ Bucket max_bucket; /* stop when next_bucket > max_bucket */
+} HashBulkDeleteStreamPrivate;
+
static void hashbuildCallback(Relation index,
ItemPointer tid,
Datum *values,
bool *isnull,
bool tupleIsAlive,
void *state);
+static BlockNumber hash_bulkdelete_read_stream_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data);
/*
scan->opaque = NULL;
}
+/*
+ * Read stream callback for hashbulkdelete.
+ *
+ * Returns the block number of the primary page for the next bucket to
+ * vacuum, using the BUCKET_TO_BLKNO mapping from the cached metapage.
+ */
+static BlockNumber
+hash_bulkdelete_read_stream_cb(ReadStream *stream,
+ void *callback_private_data,
+ void *per_buffer_data)
+{
+ HashBulkDeleteStreamPrivate *p = callback_private_data;
+ Bucket bucket;
+
+ if (p->next_bucket > p->max_bucket)
+ return InvalidBlockNumber;
+
+ bucket = p->next_bucket++;
+ return BUCKET_TO_BLKNO(p->metap, bucket);
+}
+
/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
Buffer metabuf = InvalidBuffer;
HashMetaPage metap;
HashMetaPage cachedmetap;
+ HashBulkDeleteStreamPrivate stream_private;
+ ReadStream *stream = NULL;
tuples_removed = 0;
num_index_tuples = 0;
cur_bucket = 0;
cur_maxbucket = orig_maxbucket;
-loop_top:
+ /* Set up streaming read for primary bucket pages */
+ stream_private.metap = cachedmetap;
+ stream_private.next_bucket = cur_bucket;
+ stream_private.max_bucket = cur_maxbucket;
+
+ /*
+ * It is safe to use batchmode as hash_bulkdelete_read_stream_cb takes no
+ * locks.
+ */
+ stream = read_stream_begin_relation(READ_STREAM_MAINTENANCE |
+ READ_STREAM_USE_BATCHING,
+ info->strategy,
+ rel,
+ MAIN_FORKNUM,
+ hash_bulkdelete_read_stream_cb,
+ &stream_private,
+ 0);
+
+bucket_loop:
while (cur_bucket <= cur_maxbucket)
{
BlockNumber bucket_blkno;
* We need to acquire a cleanup lock on the primary bucket page to out
* wait concurrent scans before deleting the dead tuples.
*/
- buf = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, info->strategy);
+ buf = read_stream_next_buffer(stream, NULL);
+ Assert(BufferIsValid(buf));
LockBufferForCleanup(buf);
_hash_checkpage(rel, buf, LH_BUCKET_PAGE);
{
cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
Assert(cachedmetap != NULL);
+
+ /*
+ * Reset stream with updated metadata for remaining buckets.
+ * The BUCKET_TO_BLKNO mapping depends on hashm_spares[],
+ * which may have changed.
+ */
+ stream_private.metap = cachedmetap;
+ stream_private.next_bucket = cur_bucket + 1;
+ stream_private.max_bucket = cur_maxbucket;
+ read_stream_reset(stream);
}
}
cachedmetap = _hash_getcachedmetap(rel, &metabuf, true);
Assert(cachedmetap != NULL);
cur_maxbucket = cachedmetap->hashm_maxbucket;
- goto loop_top;
+
+ /* Reset stream to process additional buckets from split */
+ stream_private.metap = cachedmetap;
+ stream_private.next_bucket = cur_bucket;
+ stream_private.max_bucket = cur_maxbucket;
+ read_stream_reset(stream);
+ goto bucket_loop;
}
+ /* Stream should be exhausted since we processed all buckets */
+ Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer);
+ read_stream_end(stream);
+
/* Okay, we're really done. Update tuple count in metapage. */
START_CRIT_SECTION();