diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5b67def..82c7c87 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1240,6 +1240,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for parallel workers to finish computing.
+ ParallelBitmapScan
+ Waiting for leader backend to complete the TidBitmap.
+
+
SafeSnapshot
Waiting for a snapshot for a READ ONLY DEFERRABLE transaction.
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 0be48fb..82805d6 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1754,6 +1754,22 @@ retry:
}
/* ----------------
+ * heap_update_snapshot
+ *
+ * Update snapshot info in heap scan descriptor.
+ * ----------------
+ */
+void
+heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
+{
+ Assert(IsMVCCSnapshot(snapshot));
+
+ RegisterSnapshot(snapshot);
+ scan->rs_snapshot = snapshot;
+ scan->rs_temp_snap = true;
+}
+
+/* ----------------
* heap_getnext - retrieve next tuple in scan
*
* Fix to work with index relations.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index fe87c9a..a2593be 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
@@ -205,6 +206,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecCustomScanEstimate((CustomScanState *) planstate,
e->pcxt);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
@@ -257,6 +262,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
d->pcxt);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
+ d->pcxt);
+ break;
+
default:
break;
}
@@ -733,6 +743,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
toc);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapInitializeWorker(
+ (BitmapHeapScanState *) planstate, toc);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index f18827d..91e032e 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -48,10 +48,14 @@
#include "utils/snapmgr.h"
#include "utils/tqual.h"
+#include "miscadmin.h"
+#include "pgstat.h"
static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres);
+static bool pbms_is_leader(ParallelBitmapInfo *pbminfo);
+static void pbms_set_parallel(PlanState *node);
/* ----------------------------------------------------------------
* BitmapHeapNext
@@ -66,13 +70,16 @@ BitmapHeapNext(BitmapHeapScanState *node)
HeapScanDesc scan;
TIDBitmap *tbm;
TBMIterator *tbmiterator;
+ TBMSharedIterator *stbmiterator;
TBMIterateResult *tbmres;
-
+ ParallelBitmapInfo *shared_info = node->parallel_bitmap;
#ifdef USE_PREFETCH
TBMIterator *prefetch_iterator;
+ TBMSharedIterator *sprefetch_iterator;
#endif
OffsetNumber targoffset;
TupleTableSlot *slot;
+ dsa_area *dsa = node->ss.ps.state->es_query_dsa;
/*
* extract necessary information from index scan node
@@ -81,12 +88,45 @@ BitmapHeapNext(BitmapHeapScanState *node)
slot = node->ss.ss_ScanTupleSlot;
scan = node->ss.ss_currentScanDesc;
tbm = node->tbm;
+
tbmiterator = node->tbmiterator;
+ stbmiterator = node->stbmiterator;
+
tbmres = node->tbmres;
#ifdef USE_PREFETCH
prefetch_iterator = node->prefetch_iterator;
+ sprefetch_iterator = node->sprefetch_iterator;
#endif
+ /* --------------------------------------------------------------------
+ * Parallel Bitmap Heap Scan Algorithm
+ *
+ * The first worker to see the state of ParallelBitmapInfo as PBM_INITIAL
+ * becomes leader and sets the state to PBM_INPROGRESS. All other workers
+ * see the state as PBM_INPROGRESS, and will wait for leader to finish
+ * building the TIDBitmap.
+ *
+ * Leader Processing:
+ * Create TIDBitmap using DSA memory.
+ * Restore local TIDBitmap variable information into
+ * ParallelBitmapInfo so that other worker can see those.
+ * Convert the TIDBitmap into shared chunk and page array
+ * Set state to PBM_FINISHED.
+ * Wake up other workers.
+ *
+ * Other Worker Processing:
+ * Wait until leader creates shared TIDBitmap.
+ * Copy TIDBitmap from info from ParallelBitmapInfo to local TIDBitmap.
+ *
+ * Iterate and process the pages.
+ * a) In this phase each worker will iterate over shared page and chunk
+ * array and select heap pages one by one.
+ * b) Since multiple workers are iterating over same page and chunk
+ * array we need to have a shared iterator, so we grab a LWLock
+ * and iterate within a lock.
+ * ----------------------------------------------------------------
+ */
+
/*
* If we haven't yet performed the underlying index scan, do it, and begin
* the iteration over the bitmap.
@@ -101,21 +141,90 @@ BitmapHeapNext(BitmapHeapScanState *node)
*/
if (tbm == NULL)
{
- tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ /*
+ * Process the lower level node only if either we are running in non
+ * parallel mode or we are leader. In parallel mode leader will
+ * immediately come out of the function, but all other workers will
+ * be blocked until leader wake them up.
+ */
+ if (shared_info == NULL || pbms_is_leader(shared_info))
+ {
+ /*
+ * If we are in parallel mode recursively process the outer node
+ * and set parallel flag. This flag will be used to indicate the
+ * underlying tidbitmap layer to allocate pagetable elements from
+ * DSA.
+ */
+ if (shared_info)
+ pbms_set_parallel(outerPlanState(node));
- if (!tbm || !IsA(tbm, TIDBitmap))
- elog(ERROR, "unrecognized result from subplan");
+ tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ if (!tbm || !IsA(tbm, TIDBitmap))
+ elog(ERROR, "unrecognized result from subplan");
+
+ /*
+ * In parallel mode, prepare the iterate over the TIDBitmap. This
+ * will return the dsa_pointer to the shared iterator state. Which
+ * will be used by multiple workers to iterate jointly.
+ */
+ if (shared_info)
+ {
+ shared_info->tbmiterator = tbm_prepare_shared_iterate(tbm);
+#ifdef USE_PREFETCH
+ if (node->prefetch_maximum > 0)
+ {
+ shared_info->prefetch_iterator =
+ tbm_prepare_shared_iterate(tbm);
+ }
+
+ /*
+ * By this time we have already created the shared iterator so it's
+ * time to wake up other workers.
+ */
+ SpinLockAcquire(&shared_info->state_mutex);
+ shared_info->state = PBM_FINISHED;
+ SpinLockRelease(&shared_info->state_mutex);
+
+ /* Wake up all other workers. */
+ ConditionVariableBroadcast(&shared_info->cv);
+#endif
+ }
+ }
+ else
+ tbm = tbm_create(work_mem * 1024L, dsa);
node->tbm = tbm;
- node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
+
+ if (shared_info)
+ {
+ /*
+ * Allocate worker specific iterator and attach the shared
+ * iterator state to it.
+ */
+ node->stbmiterator = stbmiterator = tbm_attach_shared_iterate(
+ tbm, dsa, shared_info->tbmiterator);
+ }
+ else
+ node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
+
node->tbmres = tbmres = NULL;
#ifdef USE_PREFETCH
if (node->prefetch_maximum > 0)
{
- node->prefetch_iterator = prefetch_iterator = tbm_begin_iterate(tbm);
- node->prefetch_pages = 0;
- node->prefetch_target = -1;
+ if (shared_info)
+ {
+ node->sprefetch_iterator = tbm_attach_shared_iterate(tbm, dsa,
+ shared_info->prefetch_iterator);
+ sprefetch_iterator = node->sprefetch_iterator;
+ }
+ else
+ {
+ node->prefetch_iterator = prefetch_iterator =
+ tbm_begin_iterate(tbm);
+ node->prefetch_pages = 0;
+ node->prefetch_target = -1;
+ }
}
#endif /* USE_PREFETCH */
}
@@ -124,13 +233,22 @@ BitmapHeapNext(BitmapHeapScanState *node)
{
Page dp;
ItemId lp;
+ bool need_prefetch = false;
+ int *prefetch_target;
/*
* Get next page of results if needed
*/
if (tbmres == NULL)
{
- node->tbmres = tbmres = tbm_iterate(tbmiterator);
+ /*
+ * If we are in parallel mode perform shared iterate otherwise
+ * local iterate.
+ */
+ if (!shared_info)
+ node->tbmres = tbmres = tbm_iterate(tbmiterator);
+ else
+ node->tbmres = tbmres = tbm_shared_iterate(stbmiterator);
if (tbmres == NULL)
{
/* no more entries in the bitmap */
@@ -138,17 +256,50 @@ BitmapHeapNext(BitmapHeapScanState *node)
}
#ifdef USE_PREFETCH
- if (node->prefetch_pages > 0)
+ /*
+ * If we are in parallel mode then acquire prefetch_mutex and
+ * check prefetch_pages from shared location.
+ */
+ if (shared_info)
{
+ SpinLockAcquire(&shared_info->prefetch_mutex);
+
/* The main iterator has closed the distance by one page */
- node->prefetch_pages--;
+ if (shared_info->prefetch_pages > 0)
+ shared_info->prefetch_pages--;
+ else
+ need_prefetch = true;
+ SpinLockRelease(&shared_info->prefetch_mutex);
}
- else if (prefetch_iterator)
+ else
{
- /* Do not let the prefetch iterator get behind the main one */
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ /* The main iterator has closed the distance by one page */
+ if (node->prefetch_pages > 0)
+ node->prefetch_pages--;
+ else
+ need_prefetch = true;
+ }
+
+ if (prefetch_iterator && need_prefetch)
+ {
+ TBMIterateResult *tbmpre;
- if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
+ /* Do not let the prefetch iterator get behind the main one */
+ if (!shared_info)
+ tbmpre = tbm_iterate(prefetch_iterator);
+ else
+ tbmpre = tbm_shared_iterate(sprefetch_iterator);
+
+ /*
+ * In case of parallel mode we can only ensure that prefetch
+ * iterator is not behind the main iterator, but we can not
+ * ensure that the current blockno in the main iterator and
+ * in the prefetch iterator is same. It's possible that
+ * whatever blockno we are prefetching will be processed
+ * by another worker.
+ */
+ if ((shared_info == NULL) &&
+ (tbmpre == NULL || tbmpre->blockno != tbmres->blockno))
elog(ERROR, "prefetch and main iterators are out of sync");
}
#endif /* USE_PREFETCH */
@@ -183,19 +334,40 @@ BitmapHeapNext(BitmapHeapScanState *node)
#ifdef USE_PREFETCH
/*
- * Increase prefetch target if it's not yet at the max. Note that
- * we will increase it to zero after fetching the very first
- * page/tuple, then to one after the second tuple is fetched, then
- * it doubles as later pages are fetched.
+ * If we are in shared mode then use prefetch_target from shared
+ * location i.e pbminfo otherwise directly from node.
*/
- if (node->prefetch_target >= node->prefetch_maximum)
- /* don't increase any further */ ;
- else if (node->prefetch_target >= node->prefetch_maximum / 2)
- node->prefetch_target = node->prefetch_maximum;
- else if (node->prefetch_target > 0)
- node->prefetch_target *= 2;
+ if (shared_info == NULL)
+ prefetch_target = &node->prefetch_target;
else
- node->prefetch_target++;
+ prefetch_target = &shared_info->prefetch_target;
+
+ /* Increase prefetch target if it's not yet at the max. */
+ if (*prefetch_target < node->prefetch_maximum)
+ {
+ /* If we are in parallel mode then grab prefetch_mutex */
+ if (shared_info != NULL)
+ SpinLockAcquire(&shared_info->prefetch_mutex);
+
+ /*
+ * Increase prefetch target if it's not yet at the max. Note
+ * that we will increase it to zero after fetching the very
+ * first page/tuple, then to one after the second tuple is
+ * fetched, then it doubles as later pages are fetched.
+ */
+ if (*prefetch_target >= node->prefetch_maximum)
+ /* don't increase any further */ ;
+ else if (*prefetch_target >= node->prefetch_maximum / 2)
+ *prefetch_target = node->prefetch_maximum;
+ else if (*prefetch_target > 0)
+ *prefetch_target *= 2;
+ else
+ (*prefetch_target)++;
+
+ if (shared_info != NULL)
+ SpinLockRelease(&shared_info->prefetch_mutex);
+ }
+
#endif /* USE_PREFETCH */
}
else
@@ -211,8 +383,25 @@ BitmapHeapNext(BitmapHeapScanState *node)
* Try to prefetch at least a few pages even before we get to the
* second page if we don't stop reading after the first tuple.
*/
- if (node->prefetch_target < node->prefetch_maximum)
- node->prefetch_target++;
+ if (shared_info == NULL)
+ {
+ if (node->prefetch_target < node->prefetch_maximum)
+ node->prefetch_target++;
+ }
+ else
+ {
+ /*
+ * If we are in parallel mode then grab prefetch_mutex before
+ * updating prefetch target.
+ */
+ if (shared_info->prefetch_target < node->prefetch_maximum)
+ {
+ SpinLockAcquire(&shared_info->prefetch_mutex);
+ if (shared_info->prefetch_target < node->prefetch_maximum)
+ shared_info->prefetch_target++;
+ SpinLockRelease(&shared_info->prefetch_mutex);
+ }
+ }
#endif /* USE_PREFETCH */
}
@@ -236,21 +425,67 @@ BitmapHeapNext(BitmapHeapScanState *node)
*/
if (prefetch_iterator)
{
- while (node->prefetch_pages < node->prefetch_target)
+ int *prefetch_pages;
+ int prefetch_target;
+
+ /*
+ * If parallel bitmap info available means we are running in
+ * parallel mode. So use parallel iterator for prefetching.
+ */
+ if (shared_info)
+ {
+ prefetch_pages = &shared_info->prefetch_pages;
+ prefetch_target = shared_info->prefetch_target;
+ }
+ else
+ {
+ prefetch_pages = &node->prefetch_pages;
+ prefetch_target = node->prefetch_target;
+ }
+
+ /*
+ * We are checking the prefetch_pages without mutex. Henceforth,
+ * in parallel mode there can be some extra prefetch. Should
+ * we acquire mutex and recheck before iterating?
+ */
+ while (*prefetch_pages < prefetch_target)
{
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ TBMIterateResult *tbmpre;
+
+ if (!shared_info)
+ tbmpre = tbm_iterate(prefetch_iterator);
+ else
+ tbmpre = tbm_shared_iterate(sprefetch_iterator);
if (tbmpre == NULL)
{
- /* No more pages to prefetch */
- tbm_end_iterate(prefetch_iterator);
- node->prefetch_iterator = prefetch_iterator = NULL;
+ if (!shared_info)
+ {
+ /* No more pages to prefetch */
+ tbm_end_iterate(prefetch_iterator);
+ node->prefetch_iterator = prefetch_iterator = NULL;
+ }
+ else
+ {
+ /* No more pages to prefetch */
+ tbm_end_shared_iterate(sprefetch_iterator);
+ node->sprefetch_iterator = sprefetch_iterator = NULL;
+ }
break;
}
- node->prefetch_pages++;
+
+ if (shared_info != NULL)
+ SpinLockAcquire(&shared_info->prefetch_mutex);
+
+ (*prefetch_pages)++;
+
+ if (shared_info != NULL)
+ SpinLockRelease(&shared_info->prefetch_mutex);
+
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
}
}
+
#endif /* USE_PREFETCH */
/*
@@ -458,6 +693,10 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
tbm_end_iterate(node->tbmiterator);
if (node->prefetch_iterator)
tbm_end_iterate(node->prefetch_iterator);
+ if (node->stbmiterator)
+ tbm_end_shared_iterate(node->stbmiterator);
+ if (node->sprefetch_iterator)
+ tbm_end_shared_iterate(node->sprefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
node->tbm = NULL;
@@ -465,6 +704,16 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
node->tbmres = NULL;
node->prefetch_iterator = NULL;
+ /* reset parallel bitmap scan info, if present */
+ if (node->parallel_bitmap)
+ {
+ ParallelBitmapInfo *pbminfo = node->parallel_bitmap;
+
+ pbminfo->state = PBM_INITIAL;
+ pbminfo->prefetch_pages = 0;
+ pbminfo->prefetch_target = -1;
+ }
+
ExecScanReScan(&node->ss);
/*
@@ -514,6 +763,10 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
tbm_end_iterate(node->tbmiterator);
if (node->prefetch_iterator)
tbm_end_iterate(node->prefetch_iterator);
+ if (node->stbmiterator)
+ tbm_end_shared_iterate(node->stbmiterator);
+ if (node->sprefetch_iterator)
+ tbm_end_shared_iterate(node->sprefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
@@ -567,6 +820,7 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
scanstate->prefetch_target = 0;
/* may be updated below */
scanstate->prefetch_maximum = target_prefetch_pages;
+ scanstate->parallel_bitmap = NULL;
/*
* Miscellaneous initialization
@@ -651,3 +905,160 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
*/
return scanstate;
}
+
+/*----------------
+ * pbms_is_leader
+ *
+ * If PBMState is PBM_INITIAL then we become the leader and set the state
+ * to PBM_INPROGRESS. Otherwise we become the worker therefore we need to
+ * wait for the leader to wake us up and also PBMState should be
+ * PBM_FINISHED.
+ * ---------------
+ */
+static bool
+pbms_is_leader(ParallelBitmapInfo *pbminfo)
+{
+ bool needWait = false;
+ bool leader = false;
+
+ while (1)
+ {
+ /*---------------
+ * Check the current state
+ * If state is
+ * PBM_INITIAL : then we become leader and set it to PBM_INPROGRESS
+ * PBM_INPROGRESS : then we need to wait till leader creates BITMAP
+ * PBM_FINISHED : BITMAP is ready so no need to wait.
+ *---------------
+ */
+ SpinLockAcquire(&pbminfo->state_mutex);
+
+ if (pbminfo->state == PBM_INITIAL)
+ {
+ pbminfo->state = PBM_INPROGRESS;
+ leader = true;
+ }
+ else if (pbminfo->state == PBM_INPROGRESS)
+ needWait = true;
+ else
+ needWait = false;
+
+ SpinLockRelease(&pbminfo->state_mutex);
+
+ /* If we are leader or leader has already created a TIDBITMAP */
+ if (leader || !needWait)
+ break;
+
+ /* Sleep until leader send wake up signal */
+ ConditionVariableSleep(&pbminfo->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN);
+ }
+
+ ConditionVariableCancelSleep();
+
+ return leader;
+}
+
+/*-------------------
+ * pbms_set_parallel
+ *
+ * Recursively process the node and set the parallel flag. This flag
+ * will be used to indicate the underlying tidbitmap layer to allocate
+ * pagetable elements from DSA.
+ * -----------------
+ */
+static void
+pbms_set_parallel(PlanState *node)
+{
+ /*
+ * In case of BitmapAnd, set first bitmap index scan node as parallel
+ * because only first node will create the main bitmap other node's bitmap
+ * will be merged to the first bitmap thus no need to create them in shared
+ * memory. BitmapOr and BitmapIndex will create the TidBitmap so set the
+ * parallel flag true.
+ */
+ switch (node->type)
+ {
+ case T_BitmapIndexScanState:
+ ((BitmapIndexScanState *) node)->biss_Parallel = true;
+ break;
+ case T_BitmapOrState:
+ ((BitmapOrState *) node)->is_parallel = true;;
+ break;
+ case T_BitmapAndState:
+ pbms_set_parallel(((BitmapAndState *) node)->bitmapplans[0]);
+ break;
+ default:
+ break;
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapEstimate
+ *
+ * estimates the space required to serialize bitmap scan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+
+ /* Estimate the size for sharing parallel Bitmap info. */
+ node->pscan_len = add_size(offsetof(ParallelBitmapInfo,
+ phs_snapshot_data),
+ EstimateSnapshotSpace(estate->es_snapshot));
+
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeDSM
+ *
+ * Set up a parallel bitmap heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ ParallelBitmapInfo *pbminfo;
+ EState *estate = node->ss.ps.state;
+
+ pbminfo = shm_toc_allocate(pcxt->toc, node->pscan_len);
+
+ /* Initialize mutex to protect prefetch pages and target */
+ SpinLockInit(&pbminfo->prefetch_mutex);
+ pbminfo->prefetch_pages = 0;
+ pbminfo->prefetch_target = -1;
+
+ /* Initialize mutex to protect current state */
+ SpinLockInit(&pbminfo->state_mutex);
+ pbminfo->state = PBM_INITIAL;
+
+ ConditionVariableInit(&pbminfo->cv);
+ SerializeSnapshot(estate->es_snapshot, pbminfo->phs_snapshot_data);
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pbminfo);
+
+ node->parallel_bitmap = pbminfo;
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
+{
+ ParallelBitmapInfo *pbminfo;
+ Snapshot snapshot;
+
+ pbminfo = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+ node->parallel_bitmap = pbminfo;
+
+ snapshot = RestoreSnapshot(pbminfo->phs_snapshot_data);
+ heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot);
+}
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index 94bb289..2f01cad 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -78,7 +78,8 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
else
{
/* XXX should we use less than work_mem for this? */
- tbm = tbm_create(work_mem * 1024L, NULL);
+ tbm = tbm_create(work_mem * 1024L,
+ node->biss_Parallel ? node->ss.ps.state->es_query_dsa: NULL);
}
/*
diff --git a/src/backend/executor/nodeBitmapOr.c b/src/backend/executor/nodeBitmapOr.c
index 1d280be..e8f110b 100644
--- a/src/backend/executor/nodeBitmapOr.c
+++ b/src/backend/executor/nodeBitmapOr.c
@@ -129,7 +129,8 @@ MultiExecBitmapOr(BitmapOrState *node)
if (result == NULL) /* first subplan */
{
/* XXX should we use less than work_mem for this? */
- result = tbm_create(work_mem * 1024L, NULL);
+ result = tbm_create(work_mem * 1024L, node->is_parallel ?
+ node->ps.state->es_query_dsa: NULL);
}
((BitmapIndexScanState *) subnode)->biss_result = result;
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 85505c5..b7f7ce7 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -2877,6 +2877,31 @@ remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel)
}
/*
+ * create_partial_bitmap_paths
+ * Build partial access paths for parallel scan of a plain relation
+ */
+void
+create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+ Path *bitmapqual)
+{
+ int parallel_workers;
+ double pages_fetched;
+
+ /* Compute heap pages for bitmap heap scan */
+ pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
+ NULL, NULL);
+
+ parallel_workers = compute_parallel_worker(rel, pages_fetched, 0);
+
+ /* If any limit was set to zero, the user doesn't want a parallel scan. */
+ if (parallel_workers <= 0)
+ return;
+
+ add_partial_path(rel, (Path *) create_bitmap_heap_path(root, rel,
+ bitmapqual, rel->lateral_relids, 1.0, parallel_workers));
+}
+
+/*
* Compute the number of parallel workers that should be used to scan a
* relation. We compute the parallel workers based on the size of the heap to
* be scanned and the size of the index to be scanned, then choose a minimum
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index a43daa7..f81e469 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -816,6 +816,7 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
QualCost qpqual_cost;
Cost cpu_per_tuple;
Cost cost_per_page;
+ Cost cpu_run_cost;
double tuples_fetched;
double pages_fetched;
double spc_seq_page_cost,
@@ -877,8 +878,32 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
startup_cost += qpqual_cost.startup;
cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
+ cpu_run_cost = cpu_per_tuple * tuples_fetched;
- run_cost += cpu_per_tuple * tuples_fetched;
+ /* Adjust costing for parallelism, if used. */
+ if (path->parallel_workers > 0)
+ {
+ double parallel_divisor = get_parallel_divisor(path);
+
+ /* The CPU cost is divided among all the workers. */
+ cpu_run_cost /= parallel_divisor;
+
+ /*
+ * It may be possible to amortize some of the I/O cost, but probably
+ * not very much, because most operating systems already do aggressive
+ * prefetching. For now, we assume that the disk run cost can't be
+ * amortized at all.
+ */
+
+ /*
+ * In the case of a parallel plan, the row count needs to represent
+ * the number of tuples processed per worker.
+ */
+ path->rows = clamp_row_est(path->rows / parallel_divisor);
+ }
+
+
+ run_cost += cpu_run_cost;
/* tlist eval costs are paid per output row, not per tuple scanned */
startup_cost += path->pathtarget->cost.startup;
diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c
index 5283468..630c501 100644
--- a/src/backend/optimizer/path/indxpath.c
+++ b/src/backend/optimizer/path/indxpath.c
@@ -337,8 +337,12 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
bitmapqual = choose_bitmap_and(root, rel, bitindexpaths);
bpath = create_bitmap_heap_path(root, rel, bitmapqual,
- rel->lateral_relids, 1.0);
+ rel->lateral_relids, 1.0, 0);
add_path(rel, (Path *) bpath);
+
+ /* create a partial bitmap heap path */
+ if (rel->consider_parallel && rel->lateral_relids == NULL)
+ create_partial_bitmap_paths(root, rel, bitmapqual);
}
/*
@@ -410,7 +414,7 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
required_outer = get_bitmap_tree_required_outer(bitmapqual);
loop_count = get_loop_count(root, rel->relid, required_outer);
bpath = create_bitmap_heap_path(root, rel, bitmapqual,
- required_outer, loop_count);
+ required_outer, loop_count, 0);
add_path(rel, (Path *) bpath);
}
}
@@ -1557,6 +1561,11 @@ bitmap_scan_cost_est(PlannerInfo *root, RelOptInfo *rel, Path *ipath)
bpath.path.pathkeys = NIL;
bpath.bitmapqual = ipath;
+ /*
+ * Check the cost of temporary path without considering parallelism.
+ * Parallel bitmap heap path will be considered at later stage.
+ */
+ bpath.path.parallel_workers = 0;
cost_bitmap_heap_scan(&bpath.path, root, rel,
bpath.path.param_info,
ipath,
@@ -1599,6 +1608,12 @@ bitmap_and_cost_est(PlannerInfo *root, RelOptInfo *rel, List *paths)
bpath.path.pathkeys = NIL;
bpath.bitmapqual = (Path *) &apath;
+ /*
+ * Check the cost of temporary path without considering parallelism.
+ * Parallel bitmap heap path will be considered at later stage.
+ */
+ bpath.path.parallel_workers = 0;
+
/* Now we can do cost_bitmap_heap_scan */
cost_bitmap_heap_scan(&bpath.path, root, rel,
bpath.path.param_info,
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 997bdcf..6d152f7 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -2834,7 +2834,7 @@ create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual,
plan->plan_rows =
clamp_row_est(ipath->indexselectivity * ipath->path.parent->tuples);
plan->plan_width = 0; /* meaningless */
- plan->parallel_aware = false;
+ plan->parallel_aware = bitmapqual->parallel_aware;
*qual = get_actual_clauses(ipath->indexclauses);
*indexqual = get_actual_clauses(ipath->indexquals);
foreach(l, ipath->indexinfo->indpred)
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index f440875..20b9a59 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1071,7 +1071,8 @@ create_bitmap_heap_path(PlannerInfo *root,
RelOptInfo *rel,
Path *bitmapqual,
Relids required_outer,
- double loop_count)
+ double loop_count,
+ int parallel_degree)
{
BitmapHeapPath *pathnode = makeNode(BitmapHeapPath);
@@ -1080,9 +1081,9 @@ create_bitmap_heap_path(PlannerInfo *root,
pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
required_outer);
- pathnode->path.parallel_aware = false;
+ pathnode->path.parallel_aware = parallel_degree > 0 ? true : false;
pathnode->path.parallel_safe = rel->consider_parallel;
- pathnode->path.parallel_workers = 0;
+ pathnode->path.parallel_workers = parallel_degree;
pathnode->path.pathkeys = NIL; /* always unordered */
pathnode->bitmapqual = bitmapqual;
@@ -3258,7 +3259,7 @@ reparameterize_path(PlannerInfo *root, Path *path,
rel,
bpath->bitmapqual,
required_outer,
- loop_count);
+ loop_count, 0);
}
case T_SubqueryScan:
{
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7176cf1..f8eef0b 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3392,6 +3392,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PARALLEL_FINISH:
event_name = "ParallelFinish";
break;
+ case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
+ event_name = "ParallelBitmapScan";
+ break;
case WAIT_EVENT_SAFE_SNAPSHOT:
event_name = "SafeSnapshot";
break;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index a864f78..7e85510 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -179,6 +179,7 @@ extern void simple_heap_update(Relation relation, ItemPointer otid,
HeapTuple tup);
extern void heap_sync(Relation relation);
+extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
/* in heap/pruneheap.c */
extern void heap_page_prune_opt(Relation relation, Buffer buffer);
diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h
index d7659b9..465c58e 100644
--- a/src/include/executor/nodeBitmapHeapscan.h
+++ b/src/include/executor/nodeBitmapHeapscan.h
@@ -15,10 +15,17 @@
#define NODEBITMAPHEAPSCAN_H
#include "nodes/execnodes.h"
+#include "access/parallel.h"
extern BitmapHeapScanState *ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecBitmapHeapScan(BitmapHeapScanState *node);
extern void ExecEndBitmapHeapScan(BitmapHeapScanState *node);
extern void ExecReScanBitmapHeapScan(BitmapHeapScanState *node);
+extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
+ shm_toc *toc);
#endif /* NODEBITMAPHEAPSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 42c6c58..56f4436 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -26,6 +26,8 @@
#include "utils/sortsupport.h"
#include "utils/tuplestore.h"
#include "utils/tuplesort.h"
+#include "nodes/tidbitmap.h"
+#include "storage/condition_variable.h"
/* ----------------
@@ -1261,6 +1263,7 @@ typedef struct BitmapOrState
PlanState ps; /* its first field is NodeTag */
PlanState **bitmapplans; /* array of PlanStates for my inputs */
int nplans; /* number of input plans */
+ bool is_parallel; /* create shared tbm if it's set */
} BitmapOrState;
/* ----------------------------------------------------------------
@@ -1427,6 +1430,59 @@ typedef struct IndexOnlyScanState
long ioss_HeapFetches;
} IndexOnlyScanState;
+
+/* ----------------
+ * PBMState information : Current status of the TIDBitmap creation during
+ * parallel bitmap heap scan.
+ *
+ * PBM_INITIAL TIDBitmap creation is not yet started, so
+ * first worker to see this state will become
+ * leader and will create TIDbitmap. This will
+ * also set the state to PBM_INPROGRESS.
+ * PBM_INPROGRESS TIDBitmap creation is already in progress,
+ * so workers need to sleep until leader set the
+ * state to PBM_FINISHED and wake us up.
+ * PBM_FINISHED TIDBitmap creation is done, so now all worker
+ * can proceed to iterate over TIDBitmap.
+ * ----------------
+ */
+typedef enum
+{
+ PBM_INITIAL,
+ PBM_INPROGRESS,
+ PBM_FINISHED
+} PBMState;
+
+/* ----------------
+ * ParallelBitmapInfo information
+ * relid OID of relation to scan
+ * prefetch_mutex mutual exclusion for prefetch members
+ * (prefetch_iterator, prefetch_pages and
+ * prefetch_target)
+ * prefetch_pages # pages prefetch iterator is ahead of current
+ * prefetch_target current target prefetch distance
+ * state_mutex mutual exclusion for state field
+ * cv conditional wait variable
+ * state current state of the TIDBitmap
+ * tbmiterator iterator for scanning current pages
+ * prefetch_iterator iterator for prefetching ahead of current page
+ * phs_snapshot_data snapshot data shared to worker
+ * ----------------
+ */
+typedef struct ParallelBitmapInfo
+{
+ Oid relid;
+ slock_t prefetch_mutex;
+ slock_t state_mutex;
+ int prefetch_pages;
+ int prefetch_target;
+ ConditionVariable cv;
+ PBMState state;
+ dsa_pointer tbmiterator;
+ dsa_pointer prefetch_iterator;
+ char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelBitmapInfo;
+
/* ----------------
* BitmapIndexScanState information
*
@@ -1441,6 +1497,8 @@ typedef struct IndexOnlyScanState
* RuntimeContext expr context for evaling runtime Skeys
* RelationDesc index relation descriptor
* ScanDesc index scan descriptor
+ * Parallel Under parallel Bitmap heap so need to create shared
+ * TIDBitmap
* ----------------
*/
typedef struct BitmapIndexScanState
@@ -1457,6 +1515,7 @@ typedef struct BitmapIndexScanState
ExprContext *biss_RuntimeContext;
Relation biss_RelationDesc;
IndexScanDesc biss_ScanDesc;
+ bool biss_Parallel;
} BitmapIndexScanState;
/* ----------------
@@ -1472,7 +1531,9 @@ typedef struct BitmapIndexScanState
* prefetch_pages # pages prefetch iterator is ahead of current
* prefetch_target current target prefetch distance
* prefetch_maximum maximum value for prefetch_target
- * ----------------
+ * pscan_len size of the shared memory for parallel bitmap
+ * parallel_bitmap shared memory for parallel bitmap scan
+ *----------------
*/
typedef struct BitmapHeapScanState
{
@@ -1487,6 +1548,10 @@ typedef struct BitmapHeapScanState
int prefetch_pages;
int prefetch_target;
int prefetch_maximum;
+ Size pscan_len;
+ TBMSharedIterator *stbmiterator;
+ TBMSharedIterator *sprefetch_iterator;
+ ParallelBitmapInfo *parallel_bitmap;
} BitmapHeapScanState;
/* ----------------
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 7b41317..cfef818 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -52,7 +52,8 @@ extern BitmapHeapPath *create_bitmap_heap_path(PlannerInfo *root,
RelOptInfo *rel,
Path *bitmapqual,
Relids required_outer,
- double loop_count);
+ double loop_count,
+ int parallel_degree);
extern BitmapAndPath *create_bitmap_and_path(PlannerInfo *root,
RelOptInfo *rel,
List *bitmapquals);
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 81e7a42..51d6c09 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -54,6 +54,8 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,
List *initial_rels);
extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
+extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+ Path *bitmapqual);
#ifdef OPTIMIZER_DEBUG
extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index de8225b..c52c864 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -786,6 +786,7 @@ typedef enum
WAIT_EVENT_MQ_RECEIVE,
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
+ WAIT_EVENT_PARALLEL_BITMAP_SCAN,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP
} WaitEventIPC;