diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 1d70fae..6a3481f 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1248,6 +1248,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for parallel workers to finish computing.
+ ParallelBitmapPopulate>
+ Waiting for the leader to populate the TidBitmap.
+
+
SafeSnapshot>
Waiting for a snapshot for a READ ONLY DEFERRABLE> transaction.
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index af25836..bffc971 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1754,6 +1754,22 @@ retry:
}
/* ----------------
+ * heap_update_snapshot
+ *
+ * Update snapshot info in heap scan descriptor.
+ * ----------------
+ */
+void
+heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
+{
+ Assert(IsMVCCSnapshot(snapshot));
+
+ RegisterSnapshot(snapshot);
+ scan->rs_snapshot = snapshot;
+ scan->rs_temp_snap = true;
+}
+
+/* ----------------
* heap_getnext - retrieve next tuple in scan
*
* Fix to work with index relations.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index de0e2ba..a1289e5 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
@@ -217,6 +218,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecCustomScanEstimate((CustomScanState *) planstate,
e->pcxt);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
@@ -277,6 +282,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
d->pcxt);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
+ d->pcxt);
+ break;
+
default:
break;
}
@@ -775,6 +785,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
toc);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapInitializeWorker(
+ (BitmapHeapScanState *) planstate, toc);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index c1aa9f1..4864691 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -58,6 +58,8 @@ static inline void BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
static inline void BitmapAdjustPrefetchTarget(BitmapHeapScanState *node);
static inline void BitmapPrefetch(BitmapHeapScanState *node,
HeapScanDesc scan);
+static bool BitmapShouldInitializeSharedState(
+ ParallelBitmapHeapState *pbminfo);
/* ----------------------------------------------------------------
@@ -73,9 +75,12 @@ BitmapHeapNext(BitmapHeapScanState *node)
HeapScanDesc scan;
TIDBitmap *tbm;
TBMIterator *tbmiterator;
+ TBMSharedIterator *shared_tbmiterator;
TBMIterateResult *tbmres;
OffsetNumber targoffset;
TupleTableSlot *slot;
+ ParallelBitmapHeapState *pstate = node->pstate;
+ dsa_area *dsa = node->ss.ps.state->es_query_dsa;
/*
* extract necessary information from index scan node
@@ -84,7 +89,10 @@ BitmapHeapNext(BitmapHeapScanState *node)
slot = node->ss.ss_ScanTupleSlot;
scan = node->ss.ss_currentScanDesc;
tbm = node->tbm;
- tbmiterator = node->tbmiterator;
+ if (pstate == NULL)
+ tbmiterator = node->tbmiterator;
+ else
+ shared_tbmiterator = node->shared_tbmiterator;
tbmres = node->tbmres;
/*
@@ -99,25 +107,89 @@ BitmapHeapNext(BitmapHeapScanState *node)
* node->prefetch_maximum. This is to avoid doing a lot of prefetching in
* a scan that stops after a few tuples because of a LIMIT.
*/
- if (tbm == NULL)
+ if (!node->initialized)
{
- tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ if (!pstate)
+ {
+ tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
- if (!tbm || !IsA(tbm, TIDBitmap))
- elog(ERROR, "unrecognized result from subplan");
+ if (!tbm || !IsA(tbm, TIDBitmap))
+ elog(ERROR, "unrecognized result from subplan");
- node->tbm = tbm;
- node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
- node->tbmres = tbmres = NULL;
+ node->tbm = tbm;
+ node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
+ node->tbmres = tbmres = NULL;
#ifdef USE_PREFETCH
- if (node->prefetch_maximum > 0)
- {
- node->prefetch_iterator = tbm_begin_iterate(tbm);
- node->prefetch_pages = 0;
- node->prefetch_target = -1;
+ if (node->prefetch_maximum > 0)
+ {
+ node->prefetch_iterator = tbm_begin_iterate(tbm);
+ node->prefetch_pages = 0;
+ node->prefetch_target = -1;
+ }
+#endif /* USE_PREFETCH */
}
+ else
+ {
+ /*
+ * The leader will immediately come out of the function, but
+ * others will be blocked until leader populates the TBM and wakes
+ * them up.
+ */
+ if (BitmapShouldInitializeSharedState(pstate))
+ {
+ tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ if (!tbm || !IsA(tbm, TIDBitmap))
+ elog(ERROR, "unrecognized result from subplan");
+
+ node->tbm = tbm;
+
+ /*
+ * Prepare to iterate over the TBM. This will return the
+ * dsa_pointer of the iterator state which will be used by
+ * multiple processes to iterate jointly.
+ */
+ pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
+#ifdef USE_PREFETCH
+ if (node->prefetch_maximum > 0)
+ {
+ pstate->prefetch_iterator =
+ tbm_prepare_shared_iterate(tbm);
+
+ /*
+ * We don't need mutex here as we haven't yet woke up
+ * others
+ */
+ pstate->prefetch_pages = 0;
+ pstate->prefetch_target = -1;
+ }
+#endif
+
+ /*
+ * By this time we have already populated the TBM and
+ * initialized the shared iterators so set the state to
+ * BM_FINISHED and wake up others.
+ */
+ SpinLockAcquire(&pstate->mutex);
+ pstate->state = BM_FINISHED;
+ SpinLockRelease(&pstate->mutex);
+ ConditionVariableBroadcast(&pstate->cv);
+ }
+
+ /* Allocate a private iterator and attach the shared state to it */
+ node->shared_tbmiterator = shared_tbmiterator =
+ tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
+ node->tbmres = tbmres = NULL;
+
+#ifdef USE_PREFETCH
+ if (node->prefetch_maximum > 0)
+ {
+ node->shared_prefetch_iterator =
+ tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator);
+ }
#endif /* USE_PREFETCH */
+ }
+ node->initialized = true;
}
for (;;)
@@ -130,7 +202,14 @@ BitmapHeapNext(BitmapHeapScanState *node)
*/
if (tbmres == NULL)
{
- node->tbmres = tbmres = tbm_iterate(tbmiterator);
+ /*
+ * If we are in parallel mode perform shared iterate otherwise
+ * local iterate.
+ */
+ if (!pstate)
+ node->tbmres = tbmres = tbm_iterate(tbmiterator);
+ else
+ node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
if (tbmres == NULL)
{
/* no more entries in the bitmap */
@@ -182,8 +261,22 @@ BitmapHeapNext(BitmapHeapScanState *node)
* Try to prefetch at least a few pages even before we get to the
* second page if we don't stop reading after the first tuple.
*/
- if (node->prefetch_target < node->prefetch_maximum)
- node->prefetch_target++;
+ if (!pstate)
+ {
+ if (node->prefetch_target < node->prefetch_maximum)
+ node->prefetch_target++;
+ }
+ else if (pstate->prefetch_target < node->prefetch_maximum)
+ {
+ /*
+ * If we are in parallel mode then grab prefetch_mutex before
+ * updating prefetch target.
+ */
+ SpinLockAcquire(&pstate->mutex);
+ if (pstate->prefetch_target < node->prefetch_maximum)
+ pstate->prefetch_target++;
+ SpinLockRelease(&pstate->mutex);
+ }
#endif /* USE_PREFETCH */
}
@@ -369,20 +462,53 @@ BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
TBMIterateResult *tbmres)
{
#ifdef USE_PREFETCH
- TBMIterator *prefetch_iterator = node->prefetch_iterator;
+ ParallelBitmapHeapState *pstate = node->pstate;
- if (node->prefetch_pages > 0)
+ if (pstate == NULL)
{
- /* The main iterator has closed the distance by one page */
- node->prefetch_pages--;
+ TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+ if (node->prefetch_pages > 0)
+ {
+ /* The main iterator has closed the distance by one page */
+ node->prefetch_pages--;
+ }
+ else if (prefetch_iterator)
+ {
+ /* Do not let the prefetch iterator get behind the main one */
+ TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+ if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
+ elog(ERROR, "prefetch and main iterators are out of sync");
+ }
+ return;
}
- else if (prefetch_iterator)
+
+ if (node->prefetch_maximum > 0)
{
- /* Do not let the prefetch iterator get behind the main one */
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
+
+ SpinLockAcquire(&pstate->mutex);
+ if (pstate->prefetch_pages > 0)
+ {
+ node->prefetch_pages--;
+ SpinLockRelease(&pstate->mutex);
+ }
+ else
+ {
+ /* Release the mutex before iterating */
+ SpinLockRelease(&pstate->mutex);
- if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
- elog(ERROR, "prefetch and main iterators are out of sync");
+ /*
+ * In case of shared mode, we can not ensure that the current
+ * blockno of the main iterator and that of the prefetch iterator
+ * are same. It's possible that whatever blockno we are
+ * prefetching will be processed by another process. Therefore we
+ * don't validate the blockno unlike we do in non-parallel case.
+ */
+ if (prefetch_iterator)
+ tbm_shared_iterate(prefetch_iterator);
+ }
}
#endif /* USE_PREFETCH */
}
@@ -399,14 +525,38 @@ static inline void
BitmapAdjustPrefetchTarget(BitmapHeapScanState *node)
{
#ifdef USE_PREFETCH
- if (node->prefetch_target >= node->prefetch_maximum)
- /* don't increase any further */ ;
- else if (node->prefetch_target >= node->prefetch_maximum / 2)
- node->prefetch_target = node->prefetch_maximum;
- else if (node->prefetch_target > 0)
- node->prefetch_target *= 2;
- else
- node->prefetch_target++;
+ ParallelBitmapHeapState *pstate = node->pstate;
+
+ if (pstate == NULL)
+ {
+ if (node->prefetch_target >= node->prefetch_maximum)
+ /* don't increase any further */ ;
+ else if (node->prefetch_target >= node->prefetch_maximum / 2)
+ node->prefetch_target = node->prefetch_maximum;
+ else if (node->prefetch_target > 0)
+ node->prefetch_target *= 2;
+ else
+ node->prefetch_target++;
+ return;
+ }
+
+ /*
+ * Check before acquiring the mutex so that we can avoid acquiring the
+ * mutex if target has already reached to its max value.
+ */
+ if (pstate->prefetch_target < node->prefetch_maximum)
+ {
+ SpinLockAcquire(&pstate->mutex);
+ if (pstate->prefetch_target >= node->prefetch_maximum)
+ /* don't increase any further */ ;
+ else if (pstate->prefetch_target >= node->prefetch_maximum / 2)
+ pstate->prefetch_target = node->prefetch_maximum;
+ else if (pstate->prefetch_target > 0)
+ pstate->prefetch_target *= 2;
+ else
+ pstate->prefetch_target++;
+ SpinLockRelease(&pstate->mutex);
+ }
#endif /* USE_PREFETCH */
}
@@ -417,23 +567,83 @@ static inline void
BitmapPrefetch(BitmapHeapScanState *node, HeapScanDesc scan)
{
#ifdef USE_PREFETCH
- TBMIterator *prefetch_iterator = node->prefetch_iterator;
+ ParallelBitmapHeapState *pstate = node->pstate;
- if (prefetch_iterator)
+ if (pstate == NULL)
{
- while (node->prefetch_pages < node->prefetch_target)
+ TBMIterator *prefetch_iterator = node->prefetch_iterator;
+
+ if (prefetch_iterator)
{
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ while (node->prefetch_pages < node->prefetch_target)
+ {
+ TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+ if (tbmpre == NULL)
+ {
+ /* No more pages to prefetch */
+ tbm_end_iterate(prefetch_iterator);
+ node->prefetch_iterator = NULL;
+ break;
+ }
+ node->prefetch_pages++;
+ PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
+ }
+ }
+
+ return;
+ }
+
+ if (pstate->prefetch_pages < pstate->prefetch_target)
+ {
+ TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
- if (tbmpre == NULL)
+ if (prefetch_iterator)
+ {
+ SpinLockAcquire(&pstate->mutex);
+
+ /* Recheck under the mutex */
+ if (pstate->prefetching &&
+ pstate->prefetch_pages < pstate->prefetch_target)
{
- /* No more pages to prefetch */
- tbm_end_iterate(prefetch_iterator);
- node->prefetch_iterator = NULL;
- break;
+ /*
+ * If one of the process has already identified that we need
+ * to do prefetch then let it perform the prefetch and allow
+ * others to proceed with the work in hand. Another option
+ * could be that we allow all of them to participate in
+ * prefetching. But, most of this work done under mutex or
+ * LWLock so ultimately we may end up in prefetching
+ * sequentially.
+ */
+ pstate->prefetching = true;
+ SpinLockRelease(&pstate->mutex);
+ do
+ {
+ TBMIterateResult *tbmpre;
+
+ tbmpre = tbm_shared_iterate(prefetch_iterator);
+ if (tbmpre == NULL)
+ {
+ /* No more pages to prefetch */
+ tbm_end_shared_iterate(prefetch_iterator);
+ node->shared_prefetch_iterator = NULL;
+ break;
+ }
+
+ SpinLockAcquire(&pstate->mutex);
+ pstate->prefetch_pages++;
+ if (pstate->prefetch_pages >= pstate->prefetch_target)
+ {
+ SpinLockRelease(&pstate->mutex);
+ break;
+ }
+ SpinLockRelease(&pstate->mutex);
+
+ PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
+ } while (true);
}
- node->prefetch_pages++;
- PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
+ else
+ SpinLockRelease(&pstate->mutex);
}
}
#endif /* USE_PREFETCH */
@@ -488,12 +698,36 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
tbm_end_iterate(node->tbmiterator);
if (node->prefetch_iterator)
tbm_end_iterate(node->prefetch_iterator);
+ if (node->shared_tbmiterator)
+ tbm_end_shared_iterate(node->shared_tbmiterator);
+ if (node->shared_prefetch_iterator)
+ tbm_end_shared_iterate(node->shared_prefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
node->tbm = NULL;
node->tbmiterator = NULL;
node->tbmres = NULL;
node->prefetch_iterator = NULL;
+ node->initialized = false;
+ node->shared_tbmiterator = NULL;
+ node->shared_prefetch_iterator = NULL;
+
+ /* Reset parallel bitmap state, if present */
+ if (node->pstate)
+ {
+ dsa_area *dsa = node->ss.ps.state->es_query_dsa;
+
+ node->pstate->state = BM_INITIAL;
+
+ if (DsaPointerIsValid(node->pstate->tbmiterator))
+ tbm_free_shared_area(dsa, node->pstate->tbmiterator);
+
+ if (DsaPointerIsValid(node->pstate->prefetch_iterator))
+ dsa_free(dsa, node->pstate->prefetch_iterator);
+
+ node->pstate->tbmiterator = InvalidDsaPointer;
+ node->pstate->prefetch_iterator = InvalidDsaPointer;
+ }
ExecScanReScan(&node->ss);
@@ -546,6 +780,10 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
tbm_end_iterate(node->prefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
+ if (node->shared_tbmiterator)
+ tbm_end_shared_iterate(node->shared_tbmiterator);
+ if (node->shared_prefetch_iterator)
+ tbm_end_shared_iterate(node->shared_prefetch_iterator);
/*
* close heap scan
@@ -597,6 +835,10 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
scanstate->prefetch_target = 0;
/* may be updated below */
scanstate->prefetch_maximum = target_prefetch_pages;
+ scanstate->pscan_len = 0;
+ scanstate->initialized = false;
+ scanstate->shared_tbmiterator = NULL;
+ scanstate->pstate = NULL;
/*
* Miscellaneous initialization
@@ -681,3 +923,127 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
*/
return scanstate;
}
+
+/*----------------
+ * BitmapShouldInitializeSharedState
+ *
+ * The first process to come here and see the state to the BM_INITIAL
+ * will become the leader for the parallel bitmap scan and will be
+ * responsible for populating the TIDBitmap. The other processes will
+ * be blocked by the condition variable until the leader wakes them up.
+ * ---------------
+ */
+static bool
+BitmapShouldInitializeSharedState(ParallelBitmapHeapState *pstate)
+{
+ bool needWait = false;
+ bool leader = false;
+
+ while (1)
+ {
+ /*---------------
+ * Check the current state
+ * If state is
+ * BM_INITIAL : We become the leader and set it to BM_INPROGRESS
+ * BM_INPROGRESS : We need to wait till leader creates bitmap
+ * BM_FINISHED : bitmap is ready so no need to wait
+ *---------------
+ */
+ SpinLockAcquire(&pstate->mutex);
+
+ if (pstate->state == BM_INITIAL)
+ {
+ pstate->state = BM_INPROGRESS;
+ leader = true;
+ }
+ else if (pstate->state == BM_INPROGRESS)
+ needWait = true;
+ else
+ needWait = false;
+
+ SpinLockRelease(&pstate->mutex);
+
+ /* If we are leader or leader has already created a TIDBITMAP */
+ if (leader || !needWait)
+ break;
+
+ /* Sleep until leader send wake up signal */
+ ConditionVariableSleep(&pstate->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN);
+ }
+
+ ConditionVariableCancelSleep();
+
+ return leader;
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapEstimate
+ *
+ * estimates the space required to serialize bitmap scan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+
+ /* Estimate the size for sharing parallel Bitmap info. */
+ node->pscan_len = add_size(offsetof(ParallelBitmapHeapState,
+ phs_snapshot_data),
+ EstimateSnapshotSpace(estate->es_snapshot));
+
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeDSM
+ *
+ * Set up a parallel bitmap heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ ParallelBitmapHeapState *pstate;
+ EState *estate = node->ss.ps.state;
+
+ pstate = shm_toc_allocate(pcxt->toc, node->pscan_len);
+
+ pstate->tbmiterator = 0;
+ pstate->prefetch_iterator = 0;
+
+ /* Initialize the mutex */
+ SpinLockInit(&pstate->mutex);
+ pstate->prefetch_pages = 0;
+ pstate->prefetch_target = 0;
+ pstate->prefetching = false;
+ pstate->state = BM_INITIAL;
+
+ ConditionVariableInit(&pstate->cv);
+ SerializeSnapshot(estate->es_snapshot, pstate->phs_snapshot_data);
+
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pstate);
+ node->pstate = pstate;
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
+{
+ ParallelBitmapHeapState *pstate;
+ Snapshot snapshot;
+
+ pstate = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+ node->pstate = pstate;
+
+ snapshot = RestoreSnapshot(pstate->phs_snapshot_data);
+ heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot);
+}
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index 94bb289..ce2f321 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -78,7 +78,9 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
else
{
/* XXX should we use less than work_mem for this? */
- tbm = tbm_create(work_mem * 1024L, NULL);
+ tbm = tbm_create(work_mem * 1024L,
+ ((BitmapIndexScan *) node->ss.ps.plan)->isshared ?
+ node->ss.ps.state->es_query_dsa : NULL);
}
/*
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 1d280be..c0f2614 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -129,7 +129,9 @@ MultiExecBitmapOr(BitmapOrState *node)
if (result == NULL) /* first subplan */
{
/* XXX should we use less than work_mem for this? */
- result = tbm_create(work_mem * 1024L, NULL);
+ result = tbm_create(work_mem * 1024L,
+ ((BitmapOr *) node->ps.plan)->isshared ?
+ node->ps.state->es_query_dsa : NULL);
}
((BitmapIndexScanState *) subnode)->biss_result = result;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 05d8538..89afc7a 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -331,6 +331,7 @@ _copyBitmapOr(const BitmapOr *from)
/*
* copy remainder of node
*/
+ COPY_SCALAR_FIELD(isshared);
COPY_NODE_FIELD(bitmapplans);
return newnode;
@@ -496,6 +497,7 @@ _copyBitmapIndexScan(const BitmapIndexScan *from)
* copy remainder of node
*/
COPY_SCALAR_FIELD(indexid);
+ COPY_SCALAR_FIELD(isshared);
COPY_NODE_FIELD(indexqual);
COPY_NODE_FIELD(indexqualorig);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index b3802b4..0ec4e1b 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -441,6 +441,7 @@ _outBitmapOr(StringInfo str, const BitmapOr *node)
_outPlanInfo(str, (const Plan *) node);
+ WRITE_BOOL_FIELD(isshared);
WRITE_NODE_FIELD(bitmapplans);
}
@@ -520,6 +521,7 @@ _outBitmapIndexScan(StringInfo str, const BitmapIndexScan *node)
_outScanInfo(str, (const Scan *) node);
WRITE_OID_FIELD(indexid);
+ WRITE_BOOL_FIELD(isshared);
WRITE_NODE_FIELD(indexqual);
WRITE_NODE_FIELD(indexqualorig);
}
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 05bf2e9..d0d68cc 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1605,6 +1605,7 @@ _readBitmapOr(void)
ReadCommonPlan(&local_node->plan);
+ READ_BOOL_FIELD(isshared);
READ_NODE_FIELD(bitmapplans);
READ_DONE();
@@ -1716,6 +1717,7 @@ _readBitmapIndexScan(void)
ReadCommonScan(&local_node->scan);
READ_OID_FIELD(indexid);
+ READ_BOOL_FIELD(isshared);
READ_NODE_FIELD(indexqual);
READ_NODE_FIELD(indexqualorig);
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 633b5c1..1fef1bd 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -2875,6 +2875,30 @@ remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel)
}
/*
+ * create_partial_bitmap_paths
+ * Build partial bitmap heap path for the relation
+ */
+void
+create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+ Path *bitmapqual)
+{
+ int parallel_workers;
+ double pages_fetched;
+
+ /* Compute heap pages for bitmap heap scan */
+ pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
+ NULL, NULL);
+
+ parallel_workers = compute_parallel_worker(rel, pages_fetched, 0);
+
+ if (parallel_workers <= 0)
+ return;
+
+ add_partial_path(rel, (Path *) create_bitmap_heap_path(root, rel,
+ bitmapqual, rel->lateral_relids, 1.0, parallel_workers));
+}
+
+/*
* Compute the number of parallel workers that should be used to scan a
* relation. We compute the parallel workers based on the size of the heap to
* be scanned and the size of the index to be scanned, then choose a minimum
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index c138f57..b8cde32 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -860,6 +860,7 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
QualCost qpqual_cost;
Cost cpu_per_tuple;
Cost cost_per_page;
+ Cost cpu_run_cost;
double tuples_fetched;
double pages_fetched;
double spc_seq_page_cost,
@@ -921,8 +922,21 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
startup_cost += qpqual_cost.startup;
cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
+ cpu_run_cost = cpu_per_tuple * tuples_fetched;
+
+ /* Adjust costing for parallelism, if used. */
+ if (path->parallel_workers > 0)
+ {
+ double parallel_divisor = get_parallel_divisor(path);
+
+ /* The CPU cost is divided among all the workers. */
+ cpu_run_cost /= parallel_divisor;
- run_cost += cpu_per_tuple * tuples_fetched;
+ path->rows = clamp_row_est(path->rows / parallel_divisor);
+ }
+
+
+ run_cost += cpu_run_cost;
/* tlist eval costs are paid per output row, not per tuple scanned */
startup_cost += path->pathtarget->cost.startup;
diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c
index d8e5b81..c2b72d4 100644
--- a/src/backend/optimizer/path/indxpath.c
+++ b/src/backend/optimizer/path/indxpath.c
@@ -337,8 +337,12 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
bitmapqual = choose_bitmap_and(root, rel, bitindexpaths);
bpath = create_bitmap_heap_path(root, rel, bitmapqual,
- rel->lateral_relids, 1.0);
+ rel->lateral_relids, 1.0, 0);
add_path(rel, (Path *) bpath);
+
+ /* create a partial bitmap heap path */
+ if (rel->consider_parallel && rel->lateral_relids == NULL)
+ create_partial_bitmap_paths(root, rel, bitmapqual);
}
/*
@@ -410,7 +414,7 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
required_outer = get_bitmap_tree_required_outer(bitmapqual);
loop_count = get_loop_count(root, rel->relid, required_outer);
bpath = create_bitmap_heap_path(root, rel, bitmapqual,
- required_outer, loop_count);
+ required_outer, loop_count, 0);
add_path(rel, (Path *) bpath);
}
}
@@ -1617,6 +1621,11 @@ bitmap_scan_cost_est(PlannerInfo *root, RelOptInfo *rel, Path *ipath)
bpath.path.pathkeys = NIL;
bpath.bitmapqual = ipath;
+ /*
+ * Check the cost of temporary path without considering parallelism.
+ * Parallel bitmap heap path will be considered at later stage.
+ */
+ bpath.path.parallel_workers = 0;
cost_bitmap_heap_scan(&bpath.path, root, rel,
bpath.path.param_info,
ipath,
@@ -1659,6 +1668,12 @@ bitmap_and_cost_est(PlannerInfo *root, RelOptInfo *rel, List *paths)
bpath.path.pathkeys = NIL;
bpath.bitmapqual = (Path *) &apath;
+ /*
+ * Check the cost of temporary path without considering parallelism.
+ * Parallel bitmap heap path will be considered at later stage.
+ */
+ bpath.path.parallel_workers = 0;
+
/* Now we can do cost_bitmap_heap_scan */
cost_bitmap_heap_scan(&bpath.path, root, rel,
bpath.path.param_info,
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 1e953b4..eaf85da 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -125,6 +125,7 @@ static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root,
List *tlist, List *scan_clauses);
static Plan *create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual,
List **qual, List **indexqual, List **indexECs);
+static void bitmap_subplan_mark_shared(Plan *plan);
static TidScan *create_tidscan_plan(PlannerInfo *root, TidPath *best_path,
List *tlist, List *scan_clauses);
static SubqueryScan *create_subqueryscan_plan(PlannerInfo *root,
@@ -2577,6 +2578,9 @@ create_bitmap_scan_plan(PlannerInfo *root,
&bitmapqualorig, &indexquals,
&indexECs);
+ if (best_path->path.parallel_aware)
+ bitmap_subplan_mark_shared(bitmapqualplan);
+
/*
* The qpqual list must contain all restrictions not automatically handled
* by the index, other than pseudoconstant clauses which will be handled
@@ -2712,6 +2716,7 @@ create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual,
subplan = create_bitmap_subplan(root, (Path *) lfirst(l),
&subqual, &subindexqual,
&subindexEC);
+
subplans = lappend(subplans, subplan);
subquals = list_concat_unique(subquals, subqual);
subindexquals = list_concat_unique(subindexquals, subindexqual);
@@ -4700,6 +4705,24 @@ label_sort_with_costsize(PlannerInfo *root, Sort *plan, double limit_tuples)
plan->plan.parallel_aware = false;
}
+/*
+ * bitmap_subplan_mark_shared
+ * Mark a shared flag in bitmap subplan so that it can create underlying
+ * bitmap in shared memory which is accecible by the multiple processes.
+ */
+static void
+bitmap_subplan_mark_shared(Plan *plan)
+{
+ if (IsA(plan, BitmapAnd))
+ bitmap_subplan_mark_shared(
+ linitial(((BitmapAnd *) plan)->bitmapplans));
+ else if (IsA(plan, BitmapOr))
+ ((BitmapOr *) plan)->isshared = true;
+ else if (IsA(plan, BitmapIndexScan))
+ ((BitmapIndexScan *) plan)->isshared = true;
+ else
+ elog(ERROR, "unrecognized node type: %d", nodeTag(plan));
+}
/*****************************************************************************
*
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 3248296..6e70808 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1068,7 +1068,8 @@ create_bitmap_heap_path(PlannerInfo *root,
RelOptInfo *rel,
Path *bitmapqual,
Relids required_outer,
- double loop_count)
+ double loop_count,
+ int parallel_degree)
{
BitmapHeapPath *pathnode = makeNode(BitmapHeapPath);
@@ -1077,9 +1078,9 @@ create_bitmap_heap_path(PlannerInfo *root,
pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
required_outer);
- pathnode->path.parallel_aware = false;
+ pathnode->path.parallel_aware = parallel_degree > 0 ? true : false;
pathnode->path.parallel_safe = rel->consider_parallel;
- pathnode->path.parallel_workers = 0;
+ pathnode->path.parallel_workers = parallel_degree;
pathnode->path.pathkeys = NIL; /* always unordered */
pathnode->bitmapqual = bitmapqual;
@@ -3255,7 +3256,7 @@ reparameterize_path(PlannerInfo *root, Path *path,
rel,
bpath->bitmapqual,
required_outer,
- loop_count);
+ loop_count, 0);
}
case T_SubqueryScan:
{
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 2fb9a8b..7cacb1e 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3395,6 +3395,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PARALLEL_FINISH:
event_name = "ParallelFinish";
break;
+ case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
+ event_name = "ParallelBitmapScan";
+ break;
case WAIT_EVENT_SAFE_SNAPSHOT:
event_name = "SafeSnapshot";
break;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index a864f78..7e85510 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -179,6 +179,7 @@ extern void simple_heap_update(Relation relation, ItemPointer otid,
HeapTuple tup);
extern void heap_sync(Relation relation);
+extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
/* in heap/pruneheap.c */
extern void heap_page_prune_opt(Relation relation, Buffer buffer);
diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h
index d7659b9..465c58e 100644
--- a/src/include/executor/nodeBitmapHeapscan.h
+++ b/src/include/executor/nodeBitmapHeapscan.h
@@ -15,10 +15,17 @@
#define NODEBITMAPHEAPSCAN_H
#include "nodes/execnodes.h"
+#include "access/parallel.h"
extern BitmapHeapScanState *ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecBitmapHeapScan(BitmapHeapScanState *node);
extern void ExecEndBitmapHeapScan(BitmapHeapScanState *node);
extern void ExecReScanBitmapHeapScan(BitmapHeapScanState *node);
+extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
+ shm_toc *toc);
#endif /* NODEBITMAPHEAPSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6332ea0..fc18f38 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -26,6 +26,8 @@
#include "utils/sortsupport.h"
#include "utils/tuplestore.h"
#include "utils/tuplesort.h"
+#include "nodes/tidbitmap.h"
+#include "storage/condition_variable.h"
/* ----------------
@@ -1465,6 +1467,54 @@ typedef struct BitmapIndexScanState
} BitmapIndexScanState;
/* ----------------
+ * SharedBitmapState information
+ *
+ * BM_INITIAL TIDBitmap creation is not yet started, so first worker
+ * to see this state will set the state to BM_INPROGRESS
+ * and that process will be responsible for creating
+ * TIDBitmap.
+ * BM_INPROGRESS TIDBitmap creation is already in progress, therefore
+ * workers need to sleep until leader set the state to
+ * BM_FINISHED and wake us up.
+ * BM_FINISHED TIDBitmap creation is done, so now all worker can
+ * proceed to iterate over TIDBitmap.
+ * ----------------
+ */
+typedef enum
+{
+ BM_INITIAL,
+ BM_INPROGRESS,
+ BM_FINISHED
+} SharedBitmapState;
+
+/* ----------------
+ * ParallelBitmapHeapState information
+ * tbmiterator iterator for scanning current pages
+ * prefetch_iterator iterator for prefetching ahead of current page
+ * mutex mutual exclusion for the prefetching variable
+ * and state
+ * prefetch_pages # pages prefetch iterator is ahead of current
+ * prefetch_target current target prefetch distance
+ * prefetching set true if prefetching is in progress
+ * state current state of the TIDBitmap
+ * cv conditional wait variable
+ * phs_snapshot_data snapshot data shared to workers
+ * ----------------
+ */
+typedef struct ParallelBitmapHeapState
+{
+ dsa_pointer tbmiterator;
+ dsa_pointer prefetch_iterator;
+ slock_t mutex;
+ int prefetch_pages;
+ int prefetch_target;
+ bool prefetching;
+ SharedBitmapState state;
+ ConditionVariable cv;
+ char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelBitmapHeapState;
+
+/* ----------------
* BitmapHeapScanState information
*
* bitmapqualorig execution state for bitmapqualorig expressions
@@ -1477,6 +1527,11 @@ typedef struct BitmapIndexScanState
* prefetch_pages # pages prefetch iterator is ahead of current
* prefetch_target current target prefetch distance
* prefetch_maximum maximum value for prefetch_target
+ * pscan_len size of the shared memory for parallel bitmap
+ * initialized is node is ready to iterate
+ * shared_tbmiterator shared iterator
+ * shared_prefetch_iterator shared iterator for prefetching
+ * pstate shared state for parallel bitmap scan
* ----------------
*/
typedef struct BitmapHeapScanState
@@ -1492,6 +1547,11 @@ typedef struct BitmapHeapScanState
int prefetch_pages;
int prefetch_target;
int prefetch_maximum;
+ Size pscan_len;
+ bool initialized;
+ TBMSharedIterator *shared_tbmiterator;
+ TBMSharedIterator *shared_prefetch_iterator;
+ ParallelBitmapHeapState *pstate;
} BitmapHeapScanState;
/* ----------------
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index f72f7a8..7f38ca6 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -292,6 +292,7 @@ typedef struct BitmapAnd
typedef struct BitmapOr
{
Plan plan;
+ bool isshared;
List *bitmapplans;
} BitmapOr;
@@ -420,6 +421,7 @@ typedef struct BitmapIndexScan
{
Scan scan;
Oid indexid; /* OID of index to scan */
+ bool isshared; /* Create shared bitmap if set */
List *indexqual; /* list of index quals (OpExprs) */
List *indexqualorig; /* the same in original form */
} BitmapIndexScan;
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 53cad24..d22b039 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -53,7 +53,8 @@ extern BitmapHeapPath *create_bitmap_heap_path(PlannerInfo *root,
RelOptInfo *rel,
Path *bitmapqual,
Relids required_outer,
- double loop_count);
+ double loop_count,
+ int parallel_degree);
extern BitmapAndPath *create_bitmap_and_path(PlannerInfo *root,
RelOptInfo *rel,
List *bitmapquals);
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index ebda308..3ec2d9d 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -56,6 +56,8 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,
extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
extern int compute_parallel_worker(RelOptInfo *rel, BlockNumber heap_pages,
BlockNumber index_pages);
+extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+ Path *bitmapqual);
#ifdef OPTIMIZER_DEBUG
extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0062fb8..60c78d1 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -787,6 +787,7 @@ typedef enum
WAIT_EVENT_MQ_RECEIVE,
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
+ WAIT_EVENT_PARALLEL_BITMAP_SCAN,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP
} WaitEventIPC;