}
/*
- * index_parallelscan_estimate - estimate shared memory for parallel scan
- *
- * When instrument=true, estimate includes SharedIndexScanInstrumentation
- * space. When parallel_aware=true, estimate includes whatever space the
- * index AM's amestimateparallelscan routine requested when called.
+ * Estimates the shared memory needed for parallel scan, including any
+ * AM-specific parallel scan state.
*/
Size
index_parallelscan_estimate(Relation indexRelation, int nkeys, int norderbys,
- Snapshot snapshot, bool instrument,
- bool parallel_aware, int nworkers)
+ Snapshot snapshot)
{
Size nbytes;
- Assert(instrument || parallel_aware);
-
RELATION_CHECKS;
nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
nbytes = MAXALIGN(nbytes);
- if (instrument)
- {
- Size sharedinfosz;
-
- sharedinfosz = offsetof(SharedIndexScanInstrumentation, winstrument) +
- nworkers * sizeof(IndexScanInstrumentation);
- nbytes = add_size(nbytes, sharedinfosz);
- nbytes = MAXALIGN(nbytes);
- }
-
/*
* If parallel scan index AM interface can't be used (or index AM provides
* no such interface), assume there is no AM-specific data needed
*/
- if (parallel_aware &&
- indexRelation->rd_indam->amestimateparallelscan != NULL)
+ if (indexRelation->rd_indam->amestimateparallelscan != NULL)
nbytes = add_size(nbytes,
indexRelation->rd_indam->amestimateparallelscan(indexRelation,
nkeys,
*/
void
index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
- Snapshot snapshot, bool instrument,
- bool parallel_aware, int nworkers,
- SharedIndexScanInstrumentation **sharedinfo,
+ Snapshot snapshot,
ParallelIndexScanDesc target)
{
Size offset;
- Assert(instrument || parallel_aware);
-
RELATION_CHECKS;
offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
target->ps_locator = heapRelation->rd_locator;
target->ps_indexlocator = indexRelation->rd_locator;
- target->ps_offset_ins = 0;
target->ps_offset_am = 0;
SerializeSnapshot(snapshot, target->ps_snapshot_data);
- if (instrument)
- {
- Size sharedinfosz;
-
- target->ps_offset_ins = offset;
- sharedinfosz = offsetof(SharedIndexScanInstrumentation, winstrument) +
- nworkers * sizeof(IndexScanInstrumentation);
- offset = add_size(offset, sharedinfosz);
- offset = MAXALIGN(offset);
-
- /* Set leader's *sharedinfo pointer, and initialize stats */
- *sharedinfo = (SharedIndexScanInstrumentation *)
- OffsetToPointer(target, target->ps_offset_ins);
- memset(*sharedinfo, 0, sharedinfosz);
- (*sharedinfo)->num_workers = nworkers;
- }
-
/* aminitparallelscan is optional; assume no-op if not provided by AM */
- if (parallel_aware && indexRelation->rd_indam->aminitparallelscan != NULL)
+ if (indexRelation->rd_indam->aminitparallelscan != NULL)
{
void *amtarget;
e->pcxt);
break;
case T_IndexScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexScanEstimate((IndexScanState *) planstate,
+ e->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
- ExecIndexScanEstimate((IndexScanState *) planstate,
- e->pcxt);
+ ExecIndexScanInstrumentEstimate((IndexScanState *) planstate,
+ e->pcxt);
break;
case T_IndexOnlyScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
+ e->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
- ExecIndexOnlyScanEstimate((IndexOnlyScanState *) planstate,
- e->pcxt);
+ ExecIndexOnlyScanInstrumentEstimate((IndexOnlyScanState *) planstate,
+ e->pcxt);
break;
case T_BitmapIndexScanState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
d->pcxt);
break;
case T_IndexScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexScanInitializeDSM((IndexScanState *) planstate,
+ d->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
- ExecIndexScanInitializeDSM((IndexScanState *) planstate, d->pcxt);
+ ExecIndexScanInstrumentInitDSM((IndexScanState *) planstate,
+ d->pcxt);
break;
case T_IndexOnlyScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
+ d->pcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
- ExecIndexOnlyScanInitializeDSM((IndexOnlyScanState *) planstate,
- d->pcxt);
+ ExecIndexOnlyScanInstrumentInitDSM((IndexOnlyScanState *) planstate,
+ d->pcxt);
break;
case T_BitmapIndexScanState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
break;
case T_IndexScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexScanInitializeWorker((IndexScanState *) planstate,
+ pwcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
- ExecIndexScanInitializeWorker((IndexScanState *) planstate, pwcxt);
+ ExecIndexScanInstrumentInitWorker((IndexScanState *) planstate,
+ pwcxt);
break;
case T_IndexOnlyScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
+ pwcxt);
/* even when not parallel-aware, for EXPLAIN ANALYZE */
- ExecIndexOnlyScanInitializeWorker((IndexOnlyScanState *) planstate,
- pwcxt);
+ ExecIndexOnlyScanInstrumentInitWorker((IndexOnlyScanState *) planstate,
+ pwcxt);
break;
case T_BitmapIndexScanState:
/* even when not parallel-aware, for EXPLAIN ANALYZE */
node->biss_SharedInfo =
(SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc,
size);
- shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+ shm_toc_insert(pcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
node->biss_SharedInfo);
/* Each per-worker area must start out as zeroes */
return;
node->biss_SharedInfo = (SharedIndexScanInstrumentation *)
- shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
+ shm_toc_lookup(pwcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ false);
}
/* ----------------------------------------------------------------
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
- bool instrument = (node->ss.ps.instrument != NULL);
- bool parallel_aware = node->ss.ps.plan->parallel_aware;
-
- if (!instrument && !parallel_aware)
- {
- /* No DSM required by the scan */
- return;
- }
node->ioss_PscanLen = index_parallelscan_estimate(node->ioss_RelationDesc,
node->ioss_NumScanKeys,
node->ioss_NumOrderByKeys,
- estate->es_snapshot,
- instrument, parallel_aware,
- pcxt->nworkers);
+ estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->ioss_PscanLen);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
{
EState *estate = node->ss.ps.state;
ParallelIndexScanDesc piscan;
- bool instrument = node->ss.ps.instrument != NULL;
- bool parallel_aware = node->ss.ps.plan->parallel_aware;
-
- if (!instrument && !parallel_aware)
- {
- /* No DSM required by the scan */
- return;
- }
piscan = shm_toc_allocate(pcxt->toc, node->ioss_PscanLen);
index_parallelscan_initialize(node->ss.ss_currentRelation,
node->ioss_RelationDesc,
estate->es_snapshot,
- instrument, parallel_aware, pcxt->nworkers,
- &node->ioss_SharedInfo, piscan);
+ piscan);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
- if (!parallel_aware)
- {
- /* Only here to initialize SharedInfo in DSM */
- return;
- }
-
node->ioss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->ioss_RelationDesc,
ParallelWorkerContext *pwcxt)
{
ParallelIndexScanDesc piscan;
- bool instrument = node->ss.ps.instrument != NULL;
- bool parallel_aware = node->ss.ps.plan->parallel_aware;
-
- if (!instrument && !parallel_aware)
- {
- /* No DSM required by the scan */
- return;
- }
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
- if (instrument)
- node->ioss_SharedInfo = (SharedIndexScanInstrumentation *)
- OffsetToPointer(piscan, piscan->ps_offset_ins);
-
- if (!parallel_aware)
- {
- /* Only here to set up worker node's SharedInfo */
- return;
- }
-
node->ioss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->ioss_RelationDesc,
node->ioss_OrderByKeys, node->ioss_NumOrderByKeys);
}
+/*
+ * Compute the amount of space we'll need for the shared instrumentation and
+ * inform pcxt->estimator.
+ */
+void
+ExecIndexOnlyScanInstrumentEstimate(IndexOnlyScanState *node,
+ ParallelContext *pcxt)
+{
+ Size size;
+
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ /*
+ * This size calculation is trivial enough that we don't bother saving it
+ * in the IndexOnlyScanState. We'll recalculate the needed size in
+ * ExecIndexOnlyScanInstrumentInitDSM().
+ */
+ size = offsetof(SharedIndexScanInstrumentation, winstrument) +
+ pcxt->nworkers * sizeof(IndexScanInstrumentation);
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up parallel index-only scan instrumentation.
+ */
+void
+ExecIndexOnlyScanInstrumentInitDSM(IndexOnlyScanState *node,
+ ParallelContext *pcxt)
+{
+ Size size;
+
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = offsetof(SharedIndexScanInstrumentation, winstrument) +
+ pcxt->nworkers * sizeof(IndexScanInstrumentation);
+ node->ioss_SharedInfo =
+ (SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc, size);
+
+ /* Each per-worker area must start out as zeroes */
+ memset(node->ioss_SharedInfo, 0, size);
+ node->ioss_SharedInfo->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ node->ioss_SharedInfo);
+}
+
+/*
+ * Look up and save the location of the shared instrumentation.
+ */
+void
+ExecIndexOnlyScanInstrumentInitWorker(IndexOnlyScanState *node,
+ ParallelWorkerContext *pwcxt)
+{
+ if (!node->ss.ps.instrument)
+ return;
+
+ node->ioss_SharedInfo = (SharedIndexScanInstrumentation *)
+ shm_toc_lookup(pwcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ false);
+}
+
/* ----------------------------------------------------------------
* ExecIndexOnlyScanRetrieveInstrumentation
*
ParallelContext *pcxt)
{
EState *estate = node->ss.ps.state;
- bool instrument = node->ss.ps.instrument != NULL;
- bool parallel_aware = node->ss.ps.plan->parallel_aware;
-
- if (!instrument && !parallel_aware)
- {
- /* No DSM required by the scan */
- return;
- }
node->iss_PscanLen = index_parallelscan_estimate(node->iss_RelationDesc,
node->iss_NumScanKeys,
node->iss_NumOrderByKeys,
- estate->es_snapshot,
- instrument, parallel_aware,
- pcxt->nworkers);
+ estate->es_snapshot);
shm_toc_estimate_chunk(&pcxt->estimator, node->iss_PscanLen);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
{
EState *estate = node->ss.ps.state;
ParallelIndexScanDesc piscan;
- bool instrument = node->ss.ps.instrument != NULL;
- bool parallel_aware = node->ss.ps.plan->parallel_aware;
-
- if (!instrument && !parallel_aware)
- {
- /* No DSM required by the scan */
- return;
- }
piscan = shm_toc_allocate(pcxt->toc, node->iss_PscanLen);
index_parallelscan_initialize(node->ss.ss_currentRelation,
node->iss_RelationDesc,
estate->es_snapshot,
- instrument, parallel_aware, pcxt->nworkers,
- &node->iss_SharedInfo, piscan);
+ piscan);
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, piscan);
- if (!parallel_aware)
- {
- /* Only here to initialize SharedInfo in DSM */
- return;
- }
-
node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc,
ParallelWorkerContext *pwcxt)
{
ParallelIndexScanDesc piscan;
- bool instrument = node->ss.ps.instrument != NULL;
- bool parallel_aware = node->ss.ps.plan->parallel_aware;
-
- if (!instrument && !parallel_aware)
- {
- /* No DSM required by the scan */
- return;
- }
piscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
- if (instrument)
- node->iss_SharedInfo = (SharedIndexScanInstrumentation *)
- OffsetToPointer(piscan, piscan->ps_offset_ins);
-
- if (!parallel_aware)
- {
- /* Only here to set up worker node's SharedInfo */
- return;
- }
-
node->iss_ScanDesc =
index_beginscan_parallel(node->ss.ss_currentRelation,
node->iss_RelationDesc,
node->iss_OrderByKeys, node->iss_NumOrderByKeys);
}
+/*
+ * Compute the amount of space we'll need for the shared instrumentation and
+ * inform pcxt->estimator.
+ */
+void
+ExecIndexScanInstrumentEstimate(IndexScanState *node,
+ ParallelContext *pcxt)
+{
+ Size size;
+
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ /*
+ * This size calculation is trivial enough that we don't bother saving it
+ * in the IndexScanState. We'll recalculate the needed size in
+ * ExecIndexScanInstrumentInitDSM().
+ */
+ size = offsetof(SharedIndexScanInstrumentation, winstrument) +
+ pcxt->nworkers * sizeof(IndexScanInstrumentation);
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up parallel index scan instrumentation.
+ */
+void
+ExecIndexScanInstrumentInitDSM(IndexScanState *node,
+ ParallelContext *pcxt)
+{
+ Size size;
+
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = offsetof(SharedIndexScanInstrumentation, winstrument) +
+ pcxt->nworkers * sizeof(IndexScanInstrumentation);
+ node->iss_SharedInfo =
+ (SharedIndexScanInstrumentation *) shm_toc_allocate(pcxt->toc, size);
+
+ /* Each per-worker area must start out as zeroes */
+ memset(node->iss_SharedInfo, 0, size);
+ node->iss_SharedInfo->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ node->iss_SharedInfo);
+}
+
+/*
+ * Look up and save the location of the shared instrumentation.
+ */
+void
+ExecIndexScanInstrumentInitWorker(IndexScanState *node,
+ ParallelWorkerContext *pwcxt)
+{
+ if (!node->ss.ps.instrument)
+ return;
+
+ node->iss_SharedInfo = (SharedIndexScanInstrumentation *)
+ shm_toc_lookup(pwcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ false);
+}
+
/* ----------------------------------------------------------------
* ExecIndexScanRetrieveInstrumentation
*
extern void index_markpos(IndexScanDesc scan);
extern void index_restrpos(IndexScanDesc scan);
extern Size index_parallelscan_estimate(Relation indexRelation,
- int nkeys, int norderbys, Snapshot snapshot,
- bool instrument, bool parallel_aware,
- int nworkers);
+ int nkeys, int norderbys, Snapshot snapshot);
extern void index_parallelscan_initialize(Relation heapRelation,
Relation indexRelation, Snapshot snapshot,
- bool instrument, bool parallel_aware,
- int nworkers,
- SharedIndexScanInstrumentation **sharedinfo,
ParallelIndexScanDesc target);
extern void index_parallelrescan(IndexScanDesc scan);
extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
{
RelFileLocator ps_locator; /* physical table relation to scan */
RelFileLocator ps_indexlocator; /* physical index relation to scan */
- Size ps_offset_ins; /* Offset to SharedIndexScanInstrumentation */
Size ps_offset_am; /* Offset to am-specific structure */
char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelIndexScanDescData;
#ifndef INSTRUMENT_NODE_H
#define INSTRUMENT_NODE_H
+/*
+ * Offset added to plan_node_id to create a second TOC key for per-worker scan
+ * instrumentation. Instrumentation and parallel-awareness are independent, so
+ * separate DSM chunks let each be allocated and initialized only when needed.
+ * In the future, if nodes need more DSM allocations, we would need a more
+ * robust system.
+ */
+#define PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET UINT64CONST(0xD000000000000000)
/* ---------------------
* Instrumentation information for aggregate function execution
ParallelContext *pcxt);
extern void ExecIndexOnlyScanInitializeWorker(IndexOnlyScanState *node,
ParallelWorkerContext *pwcxt);
+extern void ExecIndexOnlyScanInstrumentEstimate(IndexOnlyScanState *node,
+ ParallelContext *pcxt);
+extern void ExecIndexOnlyScanInstrumentInitDSM(IndexOnlyScanState *node,
+ ParallelContext *pcxt);
+extern void ExecIndexOnlyScanInstrumentInitWorker(IndexOnlyScanState *node,
+ ParallelWorkerContext *pwcxt);
extern void ExecIndexOnlyScanRetrieveInstrumentation(IndexOnlyScanState *node);
#endif /* NODEINDEXONLYSCAN_H */
extern void ExecIndexScanReInitializeDSM(IndexScanState *node, ParallelContext *pcxt);
extern void ExecIndexScanInitializeWorker(IndexScanState *node,
ParallelWorkerContext *pwcxt);
+extern void ExecIndexScanInstrumentEstimate(IndexScanState *node,
+ ParallelContext *pcxt);
+extern void ExecIndexScanInstrumentInitDSM(IndexScanState *node,
+ ParallelContext *pcxt);
+extern void ExecIndexScanInstrumentInitWorker(IndexScanState *node,
+ ParallelWorkerContext *pwcxt);
extern void ExecIndexScanRetrieveInstrumentation(IndexScanState *node);
/*