if (planstate->plan->parallel_aware)
ExecSeqScanEstimate((SeqScanState *) planstate,
e->pcxt);
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecSeqScanInstrumentEstimate((SeqScanState *) planstate,
+ e->pcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
if (planstate->plan->parallel_aware)
ExecSeqScanInitializeDSM((SeqScanState *) planstate,
d->pcxt);
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecSeqScanInstrumentInitDSM((SeqScanState *) planstate,
+ d->pcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
case T_BitmapHeapScanState:
ExecBitmapHeapRetrieveInstrumentation((BitmapHeapScanState *) planstate);
break;
+ case T_SeqScanState:
+ ExecSeqScanRetrieveInstrumentation((SeqScanState *) planstate);
+ break;
default:
break;
}
case T_SeqScanState:
if (planstate->plan->parallel_aware)
ExecSeqScanInitializeWorker((SeqScanState *) planstate, pwcxt);
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecSeqScanInstrumentInitWorker((SeqScanState *) planstate, pwcxt);
break;
case T_IndexScanState:
if (planstate->plan->parallel_aware)
#include "access/relscan.h"
#include "access/tableam.h"
+#include "executor/execParallel.h"
#include "executor/execScan.h"
#include "executor/executor.h"
#include "executor/nodeSeqscan.h"
if (scandesc == NULL)
{
+ uint32 flags = SO_NONE;
+
+ if (ScanRelIsReadOnly(&node->ss))
+ flags |= SO_HINT_REL_READ_ONLY;
+
+ if (estate->es_instrument & INSTRUMENT_IO)
+ flags |= SO_SCAN_INSTRUMENT;
+
/*
* We reach here if the scan is not parallel, or if we're serially
* executing a scan that was planned to be parallel.
*/
scandesc = table_beginscan(node->ss.ss_currentRelation,
estate->es_snapshot,
- 0, NULL,
- ScanRelIsReadOnly(&node->ss) ?
- SO_HINT_REL_READ_ONLY : SO_NONE);
+ 0, NULL, flags);
node->ss.ss_currentScanDesc = scandesc;
}
*/
scanDesc = node->ss.ss_currentScanDesc;
+ /*
+ * Collect I/O stats for this process into shared instrumentation.
+ */
+ if (node->sinstrument != NULL && IsParallelWorker())
+ {
+ SeqScanInstrumentation *si;
+
+ Assert(ParallelWorkerNumber < node->sinstrument->num_workers);
+ si = &node->sinstrument->sinstrument[ParallelWorkerNumber];
+
+ if (scanDesc && scanDesc->rs_instrument)
+ {
+ AccumulateIOStats(&si->stats.io, &scanDesc->rs_instrument->io);
+ }
+ }
+
/*
* close heap scan
*/
{
EState *estate = node->ss.ps.state;
ParallelTableScanDesc pscan;
+ uint32 flags = SO_NONE;
+
+ if (ScanRelIsReadOnly(&node->ss))
+ flags |= SO_HINT_REL_READ_ONLY;
+
+ if (estate->es_instrument & INSTRUMENT_IO)
+ flags |= SO_SCAN_INSTRUMENT;
pscan = shm_toc_allocate(pcxt->toc, node->pscan_len);
table_parallelscan_initialize(node->ss.ss_currentRelation,
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
node->ss.ss_currentScanDesc =
- table_beginscan_parallel(node->ss.ss_currentRelation, pscan,
- ScanRelIsReadOnly(&node->ss) ?
- SO_HINT_REL_READ_ONLY : SO_NONE);
+ table_beginscan_parallel(node->ss.ss_currentRelation, pscan, flags);
}
/* ----------------------------------------------------------------
ParallelWorkerContext *pwcxt)
{
ParallelTableScanDesc pscan;
+ uint32 flags = SO_NONE;
+
+ if (ScanRelIsReadOnly(&node->ss))
+ flags |= SO_HINT_REL_READ_ONLY;
+
+ if (node->ss.ps.state->es_instrument & INSTRUMENT_IO)
+ flags |= SO_SCAN_INSTRUMENT;
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
node->ss.ss_currentScanDesc =
- table_beginscan_parallel(node->ss.ss_currentRelation, pscan,
- ScanRelIsReadOnly(&node->ss) ?
- SO_HINT_REL_READ_ONLY : SO_NONE);
+ table_beginscan_parallel(node->ss.ss_currentRelation, pscan, flags);
+}
+
+/*
+ * Compute the amount of space we'll need for the shared instrumentation and
+ * inform pcxt->estimator.
+ */
+void
+ExecSeqScanInstrumentEstimate(SeqScanState *node, ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+ Size size;
+
+ if ((estate->es_instrument & INSTRUMENT_IO) == 0 || pcxt->nworkers == 0)
+ return;
+
+ size = add_size(offsetof(SharedSeqScanInstrumentation, sinstrument),
+ mul_size(pcxt->nworkers, sizeof(SeqScanInstrumentation)));
+
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/*
+ * Set up parallel sequential scan instrumentation.
+ */
+void
+ExecSeqScanInstrumentInitDSM(SeqScanState *node, ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+ SharedSeqScanInstrumentation *sinstrument;
+ Size size;
+
+ if ((estate->es_instrument & INSTRUMENT_IO) == 0 || pcxt->nworkers == 0)
+ return;
+
+ size = add_size(offsetof(SharedSeqScanInstrumentation, sinstrument),
+ mul_size(pcxt->nworkers, sizeof(SeqScanInstrumentation)));
+ sinstrument = shm_toc_allocate(pcxt->toc, size);
+ memset(sinstrument, 0, size);
+ sinstrument->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ sinstrument);
+ node->sinstrument = sinstrument;
+}
+
+/*
+ * Look up and save the location of the shared instrumentation.
+ */
+void
+ExecSeqScanInstrumentInitWorker(SeqScanState *node,
+ ParallelWorkerContext *pwcxt)
+{
+ EState *estate = node->ss.ps.state;
+
+ if ((estate->es_instrument & INSTRUMENT_IO) == 0)
+ return;
+
+ node->sinstrument = shm_toc_lookup(pwcxt->toc,
+ node->ss.ps.plan->plan_node_id +
+ PARALLEL_KEY_SCAN_INSTRUMENT_OFFSET,
+ false);
+}
+
+/*
+ * Transfer sequential scan instrumentation from DSM to private memory.
+ */
+void
+ExecSeqScanRetrieveInstrumentation(SeqScanState *node)
+{
+ SharedSeqScanInstrumentation *sinstrument = node->sinstrument;
+ Size size;
+
+ if (sinstrument == NULL)
+ return;
+
+ size = offsetof(SharedSeqScanInstrumentation, sinstrument)
+ + sinstrument->num_workers * sizeof(SeqScanInstrumentation);
+
+ node->sinstrument = palloc(size);
+ memcpy(node->sinstrument, sinstrument, size);
}
(1 row)
\a
-select explain_filter('explain (analyze, buffers, format xml) select * from int8_tbl i8');
+select explain_filter('explain (analyze, buffers, io, format xml) select * from int8_tbl i8');
explain_filter
<explain xmlns="http://www.postgresql.org/N/explain">
<Query>
<Actual-Rows>N.N</Actual-Rows>
<Actual-Loops>N</Actual-Loops>
<Disabled>false</Disabled>
+ <Average-Prefetch-Distance>N.N</Average-Prefetch-Distance>
+ <Max-Prefetch-Distance>N</Max-Prefetch-Distance>
+ <Prefetch-Capacity>N</Prefetch-Capacity>
+ <I-O-Count>N</I-O-Count>
+ <I-O-Waits>N</I-O-Waits>
+ <Average-I-O-Size>N.N</Average-I-O-Size>
+ <Average-I-Os-In-Progress>N.N</Average-I-Os-In-Progress>
<Shared-Hit-Blocks>N</Shared-Hit-Blocks>
<Shared-Read-Blocks>N</Shared-Read-Blocks>
<Shared-Dirtied-Blocks>N</Shared-Dirtied-Blocks>
</Query>
</explain>
(1 row)
-select explain_filter('explain (analyze, serialize, buffers, format yaml) select * from int8_tbl i8');
+select explain_filter('explain (analyze, serialize, buffers, io, format yaml) select * from int8_tbl i8');
explain_filter
- Plan:
Node Type: "Seq Scan"
Actual Rows: N.N
Actual Loops: N
Disabled: false
+ Average Prefetch Distance: N.N
+ Max Prefetch Distance: N
+ Prefetch Capacity: N
+ I/O Count: N
+ I/O Waits: N
+ Average I/O Size: N.N
+ Average I/Os In Progress: N.N
Shared Hit Blocks: N
Shared Read Blocks: N
Shared Dirtied Blocks: N