if (planstate->plan->parallel_aware)
ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
e->pcxt);
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecTidRangeScanInstrumentEstimate((TidRangeScanState *) planstate,
+ e->pcxt);
break;
case T_AppendState:
if (planstate->plan->parallel_aware)
if (planstate->plan->parallel_aware)
ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
d->pcxt);
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecTidRangeScanInstrumentInitDSM((TidRangeScanState *) planstate,
+ d->pcxt);
break;
case T_AppendState:
if (planstate->plan->parallel_aware)
case T_SeqScanState:
ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate);
break;
+ case T_TidRangeScanState:
+ ExecTidRangeScanRetrieveInstrumentation((TidRangeScanState *) planstate);
+ break;
default:
break;
}
if (planstate->plan->parallel_aware)
ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
pwcxt);
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecTidRangeScanInstrumentInitWorker((TidRangeScanState *) planstate,
+ pwcxt);
break;
case T_AppendState:
if (planstate->plan->parallel_aware)
#include "access/sysattr.h"
#include "access/tableam.h"
#include "catalog/pg_operator.h"
+#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/instrument.h"
#include "executor/nodeTidrangescan.h"
#include "nodes/nodeFuncs.h"
#include "utils/rel.h"
if (scandesc == NULL)
{
+ uint32 flags = SO_NONE;
+
+ if (ScanRelIsReadOnly(&node->ss))
+ flags |= SO_HINT_REL_READ_ONLY;
+
+ if (estate->es_instrument & INSTRUMENT_IO)
+ flags |= SO_SCAN_INSTRUMENT;
+
scandesc = table_beginscan_tidrange(node->ss.ss_currentRelation,
estate->es_snapshot,
&node->trss_mintid,
&node->trss_maxtid,
- ScanRelIsReadOnly(&node->ss) ?
- SO_HINT_REL_READ_ONLY : SO_NONE);
+ flags);
node->ss.ss_currentScanDesc = scandesc;
}
else
{
TableScanDesc scan = node->ss.ss_currentScanDesc;
+ /* Collect IO stats for this process into shared instrumentation */
+ if (node->trss_sinstrument != NULL && IsParallelWorker())
+ {
+ TidRangeScanInstrumentation *si;
+
+ Assert(ParallelWorkerNumber < node->trss_sinstrument->num_workers);
+ si = &node->trss_sinstrument->sinstrument[ParallelWorkerNumber];
+
+ if (scan && scan->rs_instrument)
+ {
+ AccumulateIOStats(&si->stats.io, &scan->rs_instrument->io);
+ }
+ }
+
if (scan != NULL)
table_endscan(scan);
}
{
EState *estate = node->ss.ps.state;
ParallelTableScanDesc pscan;
+ uint32 flags = SO_NONE;
+
+ if (ScanRelIsReadOnly(&node->ss))
+ flags |= SO_HINT_REL_READ_ONLY;
+
+ if (estate->es_instrument & INSTRUMENT_IO)
+ flags |= SO_SCAN_INSTRUMENT;
pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen);
table_parallelscan_initialize(node->ss.ss_currentRelation,
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
node->ss.ss_currentScanDesc =
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
- pscan,
- ScanRelIsReadOnly(&node->ss) ?
- SO_HINT_REL_READ_ONLY : SO_NONE);
+ pscan, flags);
}
/* ----------------------------------------------------------------
ParallelWorkerContext *pwcxt)
{
ParallelTableScanDesc pscan;
+ uint32 flags = SO_NONE;
+
+ if (ScanRelIsReadOnly(&node->ss))
+ flags |= SO_HINT_REL_READ_ONLY;
+
+ if (node->ss.ps.state->es_instrument & INSTRUMENT_IO)
+ flags |= SO_SCAN_INSTRUMENT;
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc =
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
- pscan,
- ScanRelIsReadOnly(&node->ss) ?
- SO_HINT_REL_READ_ONLY : SO_NONE);
+ pscan, flags);
+}
+
+/*
+ * Compute the amount of space we'll need for the shared instrumentation and
+ * inform pcxt->estimator.
+ */
+void
+ExecTidRangeScanInstrumentEstimate(TidRangeScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+ Size size;
+
+ if ((estate->es_instrument & INSTRUMENT_IO) == 0 || pcxt->nworkers == 0)
+ return;
+
+ size = add_size(offsetof(SharedTidRangeScanInstrumentation, sinstrument),
+ mul_size(pcxt->nworkers, sizeof(TidRangeScanInstrumentation)));
+
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up parallel scan instrumentation.
+ */
+void
+ExecTidRangeScanInstrumentInitDSM(TidRangeScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+ SharedTidRangeScanInstrumentation *sinstrument;
+ Size size;
+
+ if ((estate->es_instrument & INSTRUMENT_IO) == 0 || pcxt->nworkers == 0)
+ return;
+
+ size = add_size(offsetof(SharedTidRangeScanInstrumentation, sinstrument),
+ mul_size(pcxt->nworkers, sizeof(TidRangeScanInstrumentation)));
+ sinstrument = shm_toc_allocate(pcxt->toc, size);
+ memset(sinstrument, 0, size);
+ sinstrument->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ sinstrument);
+ node->trss_sinstrument = sinstrument;
+}
+
+/*
+ * Look up and save the location of the shared instrumentation.
+ */
+void
+ExecTidRangeScanInstrumentInitWorker(TidRangeScanState *node,
+ ParallelWorkerContext *pwcxt)
+{
+ EState *estate = node->ss.ps.state;
+
+ if ((estate->es_instrument & INSTRUMENT_IO) == 0)
+ return;
+
+ node->trss_sinstrument = shm_toc_lookup(pwcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ false);
+}
+
+/*
+ * Transfer scan instrumentation from DSM to private memory.
+ */
+void
+ExecTidRangeScanRetrieveInstrumentation(TidRangeScanState *node)
+{
+ SharedTidRangeScanInstrumentation *sinstrument = node->trss_sinstrument;
+ Size size;
+
+ if (sinstrument == NULL)
+ return;
+
+ size = offsetof(SharedTidRangeScanInstrumentation, sinstrument)
+ + sinstrument->num_workers * sizeof(TidRangeScanInstrumentation);
+
+ node->trss_sinstrument = palloc(size);
+ memcpy(node->trss_sinstrument, sinstrument, size);
}