diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3de489e..4684cdc 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1234,6 +1234,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for parallel workers to finish computing.
+ ParallelBitmapScan>
+ Waiting for leader to complete the TidBitmap.
+
+
SafeSnapshot>
Waiting for a snapshot for a READ ONLY DEFERRABLE> transaction.
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b019bc1..6feda21 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1753,6 +1753,22 @@ retry:
}
/* ----------------
+ * heap_update_snapshot
+ *
+ * Update snpashot info in heap scan descriptor.
+ * ----------------
+ */
+void
+heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
+{
+ Assert(IsMVCCSnapshot(snapshot));
+
+ RegisterSnapshot(snapshot);
+ scan->rs_snapshot = snapshot;
+ scan->rs_temp_snap = true;
+}
+
+/* ----------------
* heap_getnext - retrieve next tuple in scan
*
* Fix to work with index relations.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ee1d76e..802a517 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -29,6 +29,7 @@
#include "executor/nodeForeignscan.h"
#include "executor/nodeSeqscan.h"
#include "executor/tqueue.h"
+#include "executor/nodeBitmapHeapscan.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
@@ -204,6 +205,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecCustomScanEstimate((CustomScanState *) planstate,
e->pcxt);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate,
+ e->pcxt);
+ break;
default:
break;
}
@@ -256,6 +261,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
d->pcxt);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate,
+ d->pcxt);
+ break;
+
default:
break;
}
@@ -727,6 +737,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
toc);
break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapInitializeWorker(
+ (BitmapHeapScanState *) planstate, toc);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeBitmapHeapscan.c b/src/backend/executor/nodeBitmapHeapscan.c
index 449aacb..5cc5c75 100644
--- a/src/backend/executor/nodeBitmapHeapscan.c
+++ b/src/backend/executor/nodeBitmapHeapscan.c
@@ -47,11 +47,15 @@
#include "utils/spccache.h"
#include "utils/snapmgr.h"
#include "utils/tqual.h"
-
+#include "miscadmin.h"
+#include "pgstat.h"
static TupleTableSlot *BitmapHeapNext(BitmapHeapScanState *node);
static void bitgetpage(HeapScanDesc scan, TBMIterateResult *tbmres);
-
+static bool pbms_is_leader(ParallelBitmapInfo *pbminfo);
+static TBMIterateResult *pbms_parallel_iterate(TBMIterator *iterator,
+ ParallelIterator *parallel_iterator);
+static void pbms_set_parallel(PlanState *node);
/* ----------------------------------------------------------------
* BitmapHeapNext
@@ -67,12 +71,18 @@ BitmapHeapNext(BitmapHeapScanState *node)
TIDBitmap *tbm;
TBMIterator *tbmiterator;
TBMIterateResult *tbmres;
-
+ ParallelBitmapInfo *pbminfo = node->parallel_bitmap;
#ifdef USE_PREFETCH
TBMIterator *prefetch_iterator;
#endif
OffsetNumber targoffset;
TupleTableSlot *slot;
+ ParallelTIDBitmap *parallel_tbm = NULL;
+
+ /* Get the parallel TBM address in shared memory using offset */
+ if (pbminfo)
+ parallel_tbm = (ParallelTIDBitmap*)((char *)pbminfo +
+ pbminfo->ptbm_offset);
/*
* extract necessary information from index scan node
@@ -87,6 +97,35 @@ BitmapHeapNext(BitmapHeapScanState *node)
prefetch_iterator = node->prefetch_iterator;
#endif
+ /* --------------------------------------------------------------------
+ * Parallel Bitmap Heap Scan Algorithm
+ *
+ * First worker to see the state as parallel bitmap info as PBM_INITIAL
+ * become leader and set the state to PBM_INPROGRESS All other workers
+ * see the state as PBM_INPROGRESS will wait for leader to complete the
+ * TIDBitmap.
+ *
+ * Leader Processing:
+ * Create TIDBitmap using DSA memory.
+ * Restore local TIDBitmap variable information into
+ * ParallelBitmapInfo so that other worker can see those.
+ * set state to PBM_FINISHED.
+ * Wake up other workers.
+ *
+ * Other Worker Processing:
+ * Wait until leader create shared TIDBitmap.
+ * copy TIDBitmap from info from ParallelBitmapInfo to local TIDBitmap.
+ *
+ * Iterate and process the pages.
+ * a) In this phase each worker will iterate over page and chunk array
+ * and select heap pages one by one. If prefetch is enable then
+ * there will be two iterator.
+ * b) Since multiple worker are iterating over same page and chunk
+ * array we need to have a shared iterator, so we grab a spin
+ * lock and iterate within a lock.
+ * ----------------------------------------------------------------
+ */
+
/*
* If we haven't yet performed the underlying index scan, do it, and begin
* the iteration over the bitmap.
@@ -101,7 +140,42 @@ BitmapHeapNext(BitmapHeapScanState *node)
*/
if (tbm == NULL)
{
- tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ /*
+ * Process the lower level node only if either we are running
+ * in non parallel mode or we are leader worker.
+ *
+ * In parallel mode leader worker will immediately come out
+ * of the function, but all other worker will be blocked
+ * until leader worker wake them up.
+ */
+ if (pbminfo == NULL || pbms_is_leader(pbminfo))
+ {
+ /*
+ * If we are in parallel mode recursively process the outer
+ * node and set parallel flag in lower level bitmap index scan.
+ * Later bitmap index node will use this flag to indicate
+ * tidbitmap that it needs to create an shared page table.
+ */
+ if (pbminfo)
+ {
+ node->is_leader = true;
+ pbms_set_parallel(outerPlanState(node));
+ }
+
+ tbm = (TIDBitmap *) MultiExecProcNode(outerPlanState(node));
+ }
+ else
+ {
+ /*
+ * By this time leader has already created the shared TBM.
+ * Here we need to create a local TBM and copy information from
+ * shared location. We also need to attach to shared page table
+ * using hash table handle stored in parallel_tbm(shared memory).
+ */
+ tbm = tbm_create(work_mem * 1024L);
+ tbm_set_parallel(tbm, node->ss.ps.state->es_query_area);
+ tbm_restore_shared_members(tbm, parallel_tbm);
+ }
if (!tbm || !IsA(tbm, TIDBitmap))
elog(ERROR, "unrecognized result from subplan");
@@ -118,19 +192,44 @@ BitmapHeapNext(BitmapHeapScanState *node)
node->prefetch_target = -1;
}
#endif /* USE_PREFETCH */
+
+ /*
+ * If we are in parallel mode and we are leader worker then
+ * copy the local TBM information to shared location, and wake
+ * up other workers.
+ */
+ if (node->is_leader)
+ {
+ /*
+ * copy TBM local information in shared memory before waking
+ * up other workers. Other workers will create there own
+ * TBM and copy information from shared memory.
+ */
+ tbm_update_shared_members(tbm, parallel_tbm);
+
+ /* Change the state under a lock */
+ SpinLockAcquire(&pbminfo->state_mutex);
+ pbminfo->state = PBM_FINISHED;
+ SpinLockRelease(&pbminfo->state_mutex);
+
+ /* Wake up all other workers. */
+ ConditionVariableBroadcast(&pbminfo->cv);
+ }
}
for (;;)
{
Page dp;
ItemId lp;
+ bool need_prefetch = false;
/*
* Get next page of results if needed
*/
if (tbmres == NULL)
{
- node->tbmres = tbmres = tbm_iterate(tbmiterator);
+ node->tbmres = tbmres = pbms_parallel_iterate(tbmiterator,
+ pbminfo ? &pbminfo->tbmiterator : NULL);
if (tbmres == NULL)
{
/* no more entries in the bitmap */
@@ -138,17 +237,43 @@ BitmapHeapNext(BitmapHeapScanState *node)
}
#ifdef USE_PREFETCH
- if (node->prefetch_pages > 0)
+ if (pbminfo)
+ {
+ /*
+ * If we are in parallel mode then acquire prefetch_mutex and
+ * check prefetch_pages from shared location.
+ */
+ SpinLockAcquire(&pbminfo->prefetch_mutex);
+ if (pbminfo->prefetch_pages > 0)
+ pbminfo->prefetch_pages --;
+ else
+ need_prefetch = true;
+ SpinLockRelease(&pbminfo->prefetch_mutex);
+ }
+ else if (node->prefetch_pages > 0)
{
/* The main iterator has closed the distance by one page */
node->prefetch_pages--;
}
- else if (prefetch_iterator)
+ else
+ need_prefetch = true;
+
+ if (prefetch_iterator && need_prefetch)
{
- /* Do not let the prefetch iterator get behind the main one */
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ TBMIterateResult *tbmpre;
- if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
+ /* Do not let the prefetch iterator get behind the main one */
+ tbmpre = pbms_parallel_iterate(prefetch_iterator,
+ pbminfo ? &pbminfo->prefetch_iterator : NULL);
+ /*
+ * In case of parallel mode we can only ensure that prefetch
+ * iterator is not behind main iterator, but we can not
+ * ensure that current blockno in main iterator and prefetch
+ * iterator is same. It's possible that whatever blockno we
+ * are prefetching is getting processed by some other worker.
+ */
+ if ((pbminfo == NULL)&&
+ (tbmpre == NULL || tbmpre->blockno != tbmres->blockno))
elog(ERROR, "prefetch and main iterators are out of sync");
}
#endif /* USE_PREFETCH */
@@ -182,20 +307,39 @@ BitmapHeapNext(BitmapHeapScanState *node)
#ifdef USE_PREFETCH
- /*
- * Increase prefetch target if it's not yet at the max. Note that
- * we will increase it to zero after fetching the very first
- * page/tuple, then to one after the second tuple is fetched, then
- * it doubles as later pages are fetched.
- */
- if (node->prefetch_target >= node->prefetch_maximum)
- /* don't increase any further */ ;
- else if (node->prefetch_target >= node->prefetch_maximum / 2)
- node->prefetch_target = node->prefetch_maximum;
- else if (node->prefetch_target > 0)
- node->prefetch_target *= 2;
- else
- node->prefetch_target++;
+ /* Increase prefetch target if it's not yet at the max. */
+ if (node->prefetch_target < node->prefetch_maximum)
+ {
+ int *prefetch_target;
+
+ if (pbminfo == NULL)
+ prefetch_target = &node->prefetch_target;
+ else
+ {
+ prefetch_target = &pbminfo->prefetch_target;
+ /* If we are in parallel mode then grab prefetch_mutex */
+ SpinLockAcquire(&pbminfo->prefetch_mutex);
+ }
+
+ /*
+ * Increase prefetch target if it's not yet at the max. Note
+ * that we will increase it to zero after fetching the very
+ * first page/tuple, then to one after the second tuple is
+ * fetched, then it doubles as later pages are fetched.
+ */
+ if (*prefetch_target >= node->prefetch_maximum)
+ /* don't increase any further */ ;
+ else if (*prefetch_target >= node->prefetch_maximum / 2)
+ *prefetch_target = node->prefetch_maximum;
+ else if (*prefetch_target > 0)
+ *prefetch_target *= 2;
+ else
+ (*prefetch_target)++;
+
+ if (pbminfo != NULL)
+ SpinLockRelease(&pbminfo->prefetch_mutex);
+ }
+
#endif /* USE_PREFETCH */
}
else
@@ -211,8 +355,22 @@ BitmapHeapNext(BitmapHeapScanState *node)
* Try to prefetch at least a few pages even before we get to the
* second page if we don't stop reading after the first tuple.
*/
- if (node->prefetch_target < node->prefetch_maximum)
- node->prefetch_target++;
+ if (pbminfo == NULL)
+ {
+ if (node->prefetch_target < node->prefetch_maximum)
+ node->prefetch_target++;
+ }
+ else
+ {
+ /*
+ * If we are in parallel mode then grab prefetch_mutex
+ * before updating prefetch target.
+ */
+ SpinLockAcquire(&pbminfo->prefetch_mutex);
+ if (pbminfo->prefetch_target < node->prefetch_maximum)
+ pbminfo->prefetch_target++;
+ SpinLockRelease(&pbminfo->prefetch_mutex);
+ }
#endif /* USE_PREFETCH */
}
@@ -236,21 +394,52 @@ BitmapHeapNext(BitmapHeapScanState *node)
*/
if (prefetch_iterator)
{
- while (node->prefetch_pages < node->prefetch_target)
+ int *prefetch_pages;
+ int prefetch_target;
+ TBMIterator *iterator = node->prefetch_iterator;
+ ParallelIterator *parallel_iteartor = NULL;
+ ParallelBitmapInfo *pbminfo = node->parallel_bitmap;
+
+ /*
+ * If parallel bitmap info available means we are running
+ * in parallel mode. So use parallel iterator for prefetching.
+ */
+ if (pbminfo)
{
- TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+ parallel_iteartor = &pbminfo->prefetch_iterator;
+ prefetch_pages = &pbminfo->prefetch_pages;
+ prefetch_target = pbminfo->prefetch_target;
+ }
+ else
+ {
+ prefetch_pages = &node->prefetch_pages;
+ prefetch_target = node->prefetch_target;
+ }
+ while (*prefetch_pages < prefetch_target)
+ {
+ TBMIterateResult *tbmpre = pbms_parallel_iterate(iterator,
+ parallel_iteartor);
if (tbmpre == NULL)
{
/* No more pages to prefetch */
- tbm_end_iterate(prefetch_iterator);
- node->prefetch_iterator = prefetch_iterator = NULL;
+ tbm_end_iterate(iterator);
+ prefetch_iterator = node->prefetch_iterator = NULL;
break;
}
- node->prefetch_pages++;
+
+ if (pbminfo != NULL)
+ SpinLockAcquire(&pbminfo->prefetch_mutex);
+
+ (*prefetch_pages)++;
+
+ if (pbminfo != NULL)
+ SpinLockRelease(&pbminfo->prefetch_mutex);
+
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
}
}
+
#endif /* USE_PREFETCH */
/*
@@ -465,6 +654,22 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
node->tbmres = NULL;
node->prefetch_iterator = NULL;
+ /* reset parallel bitmap scan info, if present */
+ if (node->parallel_bitmap)
+ {
+ ParallelBitmapInfo *pbminfo = node->parallel_bitmap;
+
+ pbminfo->state = PBM_INITIAL;
+ pbminfo->tbmiterator.schunkbit = 0;
+ pbminfo->tbmiterator.spageptr = 0;
+ pbminfo->tbmiterator.schunkptr = 0;
+ pbminfo->prefetch_iterator.schunkbit = 0;
+ pbminfo->prefetch_iterator.spageptr = 0;
+ pbminfo->prefetch_iterator.schunkptr = 0;
+ pbminfo->prefetch_pages = 0;
+ pbminfo->prefetch_target = -1;
+ }
+
ExecScanReScan(&node->ss);
/*
@@ -567,6 +772,8 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
scanstate->prefetch_target = 0;
/* may be updated below */
scanstate->prefetch_maximum = target_prefetch_pages;
+ scanstate->is_leader = false;
+ scanstate->parallel_bitmap = NULL;
/*
* Miscellaneous initialization
@@ -653,3 +860,237 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
*/
return scanstate;
}
+
+/*----------------
+ * pbms_is_leader
+ *
+ * Check if we are the first one to come here, if yes then
+ * we become leader, otherwise we need to wait until leader
+ * create the shared TID bitmap and wake us up.
+ * ---------------
+ */
+static bool
+pbms_is_leader(ParallelBitmapInfo *pbminfo)
+{
+ bool needWait = false;
+ bool queuedSelf = false;
+ bool leader = false;
+
+ for(;;)
+ {
+ SpinLockAcquire(&pbminfo->state_mutex);
+
+ /*
+ * Check the current state
+ *
+ * If state is
+ * PBM_INITIAL -> then we become leader and set it to PBM_INPROGRESS
+ * PBM_INPROGRESS -> then we need to wait till leader create BITMAP
+ * PBM_FINISHED -> BITMAP is ready so no need to wait.
+ *
+ */
+ if (pbminfo->state == PBM_INITIAL)
+ {
+ pbminfo->state = PBM_INPROGRESS;
+ leader = true;
+ }
+ else if (pbminfo->state == PBM_INPROGRESS)
+ needWait = true;
+ else
+ needWait = false;
+
+ SpinLockRelease(&pbminfo->state_mutex);
+
+ /*
+ * If we are a leader or else leader has already created a
+ * tid bitmap.
+ */
+ if (leader || !needWait)
+ break;
+
+ /* We need to queue */
+ if (queuedSelf)
+ {
+ /* Sleep until leader send wake up signal */
+ ConditionVariableSleep(
+ &pbminfo->cv, WAIT_EVENT_PARALLEL_BITMAP_SCAN);
+ queuedSelf = false;
+ needWait = false;
+ }
+ else if (needWait)
+ {
+ /* Add ourself to wait queue */
+ ConditionVariablePrepareToSleep(&pbminfo->cv);
+ queuedSelf = true;
+ }
+ }
+
+ /* Cancel the sleep before return */
+ ConditionVariableCancelSleep();
+
+ return leader;
+}
+
+/* ----------------
+ * pbms_iterator_init - initialize parallel bitmap scan iterator
+ *
+ * ----------------
+ */
+static void
+pbms_iterator_init(ParallelIterator *target)
+{
+ SpinLockInit(&target->mutex);
+
+ target->schunkbit = 0;
+ target->schunkptr = 0;
+ target->spageptr = 0;
+}
+
+/*
+ * pbms_parallel_iterate
+ *
+ * Wrapper over tbm_iterate, which take care of copying and restoring iterator
+ * variables from parallel iterator.
+ */
+static TBMIterateResult *
+pbms_parallel_iterate(TBMIterator *iterator, ParallelIterator *piterator)
+{
+ TBMIterateResult *output;
+
+ /* If not running in parallel mode then directly call tbm_iterate. */
+ if (piterator == NULL)
+ return tbm_iterate(iterator);
+
+ /* We are in parallel mode so grab parallel iterator mutex */
+ SpinLockAcquire(&piterator->mutex);
+
+ /*
+ * Copy information from shared location to local iterator and
+ * call iterate. Then restore back to shared iterator.
+ */
+ iterator->spageptr = piterator->spageptr;
+ iterator->schunkptr = piterator->schunkptr;
+ iterator->schunkbit = piterator->schunkbit;
+ output = tbm_iterate(iterator);
+ piterator->spageptr = iterator->spageptr;
+ piterator->schunkptr = iterator->schunkptr;
+ piterator->schunkbit = iterator->schunkbit;
+
+ SpinLockRelease(&piterator->mutex);
+
+ return output;
+}
+
+/*-------------------
+ * pbms_set_parallel
+ *
+ * Parallel bitmap heap scan set below index scan node as parallel
+ * so that it can create shared TIDBitmap.
+ * -----------------
+ */
+static void
+pbms_set_parallel(PlanState *node)
+{
+ /*
+ * In case of BitmapOr or BitmapAnd set first bitmap index scan node
+ * as parallel, because only first node will create the main bitmap
+ * other bitmaps will be merged to the first bitmap so no need to
+ * create them in shared memory.
+ */
+ switch(node->type)
+ {
+ case T_BitmapIndexScanState:
+ ((BitmapIndexScanState*)node)->biss_Parallel = true;
+ break;
+ case T_BitmapOrState:
+ pbms_set_parallel(((BitmapOrState*)node)->bitmapplans[0]);
+ break;
+ case T_BitmapAndState:
+ pbms_set_parallel(((BitmapAndState*)node)->bitmapplans[0]);
+ break;
+ default:
+ break;
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapEstimate
+ *
+ * estimates the space required to serialize bitmap scan node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ EState *estate = node->ss.ps.state;
+ Size offset = add_size(offsetof(ParallelBitmapInfo,
+ phs_snapshot_data),
+ EstimateSnapshotSpace(estate->es_snapshot));
+
+ /* Estimate the size for sharing parallel TBM info. */
+ node->pscan_len = tbm_estimate_parallel_tidbitmap(offset);
+
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeDSM
+ *
+ * Set up a parallel bitmap heap scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt)
+{
+ ParallelBitmapInfo *pbminfo;
+ EState *estate = node->ss.ps.state;
+ Size offset = add_size(offsetof(ParallelBitmapInfo,
+ phs_snapshot_data),
+ EstimateSnapshotSpace(estate->es_snapshot));
+
+ pbminfo = shm_toc_allocate(pcxt->toc, node->pscan_len);
+
+ /* Offset to parallel TBM info. */
+ pbminfo->ptbm_offset = offset;
+
+ /* Initialize shared tbmiterator and prefetch_iterator */
+ pbms_iterator_init(&pbminfo->tbmiterator);
+ pbms_iterator_init(&pbminfo->prefetch_iterator);
+
+ /* Initialize mutex to protect prefetch pages and target */
+ SpinLockInit(&pbminfo->prefetch_mutex);
+ pbminfo->prefetch_pages = 0;
+ pbminfo->prefetch_target = -1;
+
+ /* Initialize mutex to protect current state */
+ SpinLockInit(&pbminfo->state_mutex);
+ pbminfo->state = PBM_INITIAL;
+
+ ConditionVariableInit(&pbminfo->cv);
+ SerializeSnapshot(estate->es_snapshot, pbminfo->phs_snapshot_data);
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pbminfo);
+
+ node->parallel_bitmap = pbminfo;
+}
+
+/* ----------------------------------------------------------------
+ * ExecBitmapHeapInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node, shm_toc *toc)
+{
+ ParallelBitmapInfo *pbminfo;
+ Snapshot snapshot;
+
+ pbminfo = shm_toc_lookup(toc, node->ss.ps.plan->plan_node_id);
+ node->parallel_bitmap = pbminfo;
+
+ snapshot = RestoreSnapshot(pbminfo->phs_snapshot_data);
+ heap_update_snapshot(node->ss.ss_currentScanDesc, snapshot);
+}
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index a364098..9cc5088 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -82,6 +82,13 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
}
/*
+ * If parallel flag is set then set flag in TIDBitmap to indicate
+ * that we need a shared page table.
+ */
+ if (node->biss_Parallel)
+ tbm_set_parallel(tbm, node->ss.ps.state->es_query_area);
+
+ /*
* Get TIDs from index and insert into bitmap
*/
while (doscan)
diff --git a/src/backend/nodes/tidbitmap.c b/src/backend/nodes/tidbitmap.c
index ea79de7..5f9ac41 100644
--- a/src/backend/nodes/tidbitmap.c
+++ b/src/backend/nodes/tidbitmap.c
@@ -43,6 +43,12 @@
#include "access/htup_details.h"
#include "nodes/bitmapset.h"
#include "nodes/tidbitmap.h"
+#include "utils/hsearch.h"
+#include "storage/condition_variable.h"
+#include "storage/spin.h"
+#include "storage/lwlock.h"
+#include "nodes/execnodes.h"
+#include "storage/shmem.h"
/*
* The maximum number of tuples per page is not large (typically 256 with
@@ -139,24 +145,28 @@ struct TIDBitmap
/* these are valid when iterating is true: */
PagetableEntry **spages; /* sorted exact-page list, or NULL */
PagetableEntry **schunks; /* sorted lossy-chunk list, or NULL */
+ bool is_shared; /* need to build shared tbm if set*/
+ dsa_pointer dsa_data; /* dsa_pointer to the element array */
+ struct pagetable_alloc *allocator; /* shared memory allocator */
+ int dsa_entries; /* actual allocated entries */
+ dsa_area *area; /* reference to per-query shared memory area */
};
/*
- * When iterating over a bitmap in sorted order, a TBMIterator is used to
- * track our progress. There can be several iterators scanning the same
- * bitmap concurrently. Note that the bitmap becomes read-only as soon as
- * any iterator is created.
+ * Holds shared members of TIDBitmap for parallel bitmap scan.
*/
-struct TBMIterator
+struct ParallelTIDBitmap
{
- TIDBitmap *tbm; /* TIDBitmap we're iterating over */
- int spageptr; /* next spages index */
- int schunkptr; /* next schunks index */
- int schunkbit; /* next bit to check in current schunk */
- TBMIterateResult output; /* MUST BE LAST (because variable-size) */
+ dsa_pointer dsa_data; /* dsa pointers for all kind of pages */
+ int nentries; /* number of entries in pagetable */
+ int maxentries; /* limit on same to meet maxbytes */
+ int npages; /* number of exact entries in pagetable */
+ int nchunks; /* number of lossy entries in pagetable */
+ int dsa_entries; /* total item in dsa_pages */
+ bool inited; /* set true after leader converts page */
+ /* table to dsa_pointer's array. */
};
-
/* Local function prototypes */
static void tbm_union_page(TIDBitmap *a, const PagetableEntry *bpage);
static bool tbm_intersect_page(TIDBitmap *a, PagetableEntry *apage,
@@ -168,7 +178,8 @@ static bool tbm_page_is_lossy(const TIDBitmap *tbm, BlockNumber pageno);
static void tbm_mark_page_lossy(TIDBitmap *tbm, BlockNumber pageno);
static void tbm_lossify(TIDBitmap *tbm);
static int tbm_comparator(const void *left, const void *right);
-
+void * tbm_alloc_shared(Size size, void *arg);
+void tbm_free_shared(void *pointer, void *arg);
/*
* Simple inline murmur hash implementation for the exact width required, for
* performance.
@@ -231,6 +242,8 @@ tbm_create(long maxbytes)
tbm->maxentries = (int) nbuckets;
tbm->lossify_start = 0;
+ tbm->allocator = palloc(sizeof(pagetable_alloc));
+
return tbm;
}
@@ -244,7 +257,16 @@ tbm_create_pagetable(TIDBitmap *tbm)
Assert(tbm->status != TBM_HASH);
Assert(tbm->pagetable == NULL);
- tbm->pagetable = pagetable_create(tbm->mcxt, 128, NULL);
+ if (tbm->is_shared)
+ {
+ tbm->allocator->args = tbm;
+ tbm->allocator->HashAlloc = tbm_alloc_shared;
+ tbm->allocator->HashFree = tbm_free_shared;
+ tbm->pagetable = pagetable_create(tbm->mcxt, 128, tbm->allocator);
+ tbm->dsa_entries = tbm->pagetable->size;
+ }
+ else
+ tbm->pagetable = pagetable_create(tbm->mcxt, 128, NULL);
/* If entry1 is valid, push it into the hashtable */
if (tbm->status == TBM_ONE_PAGE)
@@ -271,7 +293,7 @@ tbm_create_pagetable(TIDBitmap *tbm)
void
tbm_free(TIDBitmap *tbm)
{
- if (tbm->pagetable)
+ if (!tbm->is_shared && tbm->pagetable)
pagetable_destroy(tbm->pagetable);
if (tbm->spages)
pfree(tbm->spages);
@@ -612,11 +634,16 @@ tbm_begin_iterate(TIDBitmap *tbm)
iterator->tbm = tbm;
/*
- * Initialize iteration pointers.
+ * Initialize iteration pointers, only if it's not shared tbm.
+ * In case of shared tbm, we will copy these values from
+ * shared iterator before calling tbm_iterate.
*/
- iterator->spageptr = 0;
- iterator->schunkptr = 0;
- iterator->schunkbit = 0;
+ if (!tbm->is_shared)
+ {
+ iterator->spageptr = 0;
+ iterator->schunkptr = 0;
+ iterator->schunkbit = 0;
+ }
/*
* If we have a hashtable, create and fill the sorted page lists, unless
@@ -626,7 +653,7 @@ tbm_begin_iterate(TIDBitmap *tbm)
*/
if (tbm->status == TBM_HASH && !tbm->iterating)
{
- pagetable_iterator i;
+ pagetable_iterator itr;
PagetableEntry *page;
int npages;
int nchunks;
@@ -640,15 +667,49 @@ tbm_begin_iterate(TIDBitmap *tbm)
MemoryContextAlloc(tbm->mcxt,
tbm->nchunks * sizeof(PagetableEntry *));
- npages = nchunks = 0;
- pagetable_start_iterate(tbm->pagetable, &i);
- while ((page = pagetable_iterate(tbm->pagetable, &i)) != NULL)
+ /*
+ * If we have shared TBM means we are running in parallel mode.
+ * So directly convert dsa array to page and chunk array.
+ */
+ if (tbm->is_shared)
{
- if (page->ischunk)
- tbm->schunks[nchunks++] = page;
- else
- tbm->spages[npages++] = page;
+ PagetableEntry *dsa_entry;
+ int i;
+
+ /*
+ * This step will be done by all the workers including leader.
+ * Here we need to convert array of dsa pointers to local
+ * page and chunk array.
+ */
+ dsa_entry = dsa_get_address(tbm->area, tbm->dsa_data);
+ dsa_entry = (void*)(((char*)dsa_entry) + sizeof(dsa_pointer));
+ npages = nchunks = 0;
+ for (i = 0; i < tbm->dsa_entries; i++)
+ {
+ page = (PagetableEntry*)(dsa_entry + i);
+
+ if (page->status != pagetable_IN_USE)
+ continue;
+
+ if (page->ischunk)
+ tbm->schunks[nchunks++] = page;
+ else
+ tbm->spages[npages++] = page;
+ }
+ }
+ else
+ {
+ npages = nchunks = 0;
+ pagetable_start_iterate(tbm->pagetable, &itr);
+ while ((page = pagetable_iterate(tbm->pagetable, &itr)) != NULL)
+ {
+ if (page->ischunk)
+ tbm->schunks[nchunks++] = page;
+ else
+ tbm->spages[npages++] = page;
+ }
}
+
Assert(npages == tbm->npages);
Assert(nchunks == tbm->nchunks);
if (npages > 1)
@@ -1061,3 +1122,114 @@ tbm_comparator(const void *left, const void *right)
return 1;
return 0;
}
+
+/*
+ * tbm_update_shared_members
+ *
+ * Restore leaders private tbm state to shared location. This must
+ * be called before waking up the other worker.
+ */
+void
+tbm_update_shared_members(TIDBitmap *tbm, ParallelTIDBitmap *parallel_tbm)
+{
+ /*
+ * Copy private information to shared location before
+ * waking up the other workers.
+ */
+ parallel_tbm->maxentries = tbm->maxentries;
+ parallel_tbm->nchunks = tbm->nchunks;
+ parallel_tbm->nentries = tbm->nentries;
+ parallel_tbm->npages = tbm->npages;
+ parallel_tbm->dsa_data = tbm->dsa_data;
+ parallel_tbm->dsa_entries = tbm->dsa_entries;
+}
+
+/*
+ * tbm_set_parallel
+ *
+ * Mark tidbitmap as shared and also store DSA area in it.
+ * marking tidbitmap as shared is indication that this tidbitmap
+ * should be created in shared memory. DSA area will be used for
+ * creating bitmap using dynamic shared memory hash table.
+ */
+void
+tbm_set_parallel(TIDBitmap *tbm, void *area)
+{
+ tbm->is_shared = true;
+ tbm->area = (dsa_area*)area;
+}
+
+/*
+ * tbm_estimate_parallel_tidbitmap
+ * Estimate size of shared TIDBitmap related info.
+ */
+Size tbm_estimate_parallel_tidbitmap(Size size)
+{
+ return add_size(size, sizeof(ParallelTIDBitmap));
+}
+
+/*
+ * tbm_restore_shared_members
+ *
+ * Attach worker to shared TID bitmap created by leader worker.
+ */
+void
+tbm_restore_shared_members(TIDBitmap *tbm, ParallelTIDBitmap *stbm)
+{
+ tbm->status = TBM_HASH;
+ tbm->nchunks = stbm->nchunks;
+ tbm->nentries = stbm->nentries;
+ tbm->npages = stbm->npages;
+ tbm->maxentries = stbm->maxentries;
+ tbm->dsa_data = stbm->dsa_data;
+ tbm->dsa_entries = stbm->dsa_entries;
+ tbm->is_shared = true;
+}
+
+/*
+ * tbm_alloc_shared
+ *
+ * Memory allocation function for bitmap hash.
+ * It allocated memory from DSA and also stores dsa_pointer in the memory
+ * so that during free we can directly get the dsa_pointe and free it.
+ */
+void *
+tbm_alloc_shared(Size size, void *arg)
+{
+ TIDBitmap *tbm = arg;
+ dsa_pointer dsaptr;
+ char *ptr;
+
+
+ /* Add the size for storing dsa_pointer */
+ dsaptr = dsa_allocate(tbm->area, size + sizeof(dsa_pointer));
+
+ tbm->dsa_data = dsaptr;
+
+ /* Keep track of actual number of entries */
+ if (tbm->pagetable)
+ tbm->dsa_entries = tbm->pagetable->size;
+
+ ptr = dsa_get_address(tbm->area, dsaptr);
+
+ /* Store dsa_pointer */
+ *((dsa_pointer*)ptr) = dsaptr;
+
+ return (ptr + sizeof(dsa_pointer));
+}
+
+/*
+ * tbm_free_shared
+ *
+ * Memory free function for tid bitmap
+ */
+void
+tbm_free_shared(void *pointer, void *arg)
+{
+ TIDBitmap *tbm = arg;
+
+ dsa_pointer dsa_data =
+ *((dsa_pointer*)((char*)pointer - sizeof (dsa_pointer)));
+
+ dsa_free(tbm->area, dsa_data);
+}
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index e42ef98..058e55a 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -126,6 +126,7 @@ static void subquery_push_qual(Query *subquery,
static void recurse_push_qual(Node *setOp, Query *topquery,
RangeTblEntry *rte, Index rti, Node *qual);
static void remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel);
+static int compute_parallel_worker(RelOptInfo *rel, BlockNumber pages);
/*
@@ -678,49 +679,7 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
{
int parallel_workers;
- /*
- * If the user has set the parallel_workers reloption, use that; otherwise
- * select a default number of workers.
- */
- if (rel->rel_parallel_workers != -1)
- parallel_workers = rel->rel_parallel_workers;
- else
- {
- int parallel_threshold;
-
- /*
- * If this relation is too small to be worth a parallel scan, just
- * return without doing anything ... unless it's an inheritance child.
- * In that case, we want to generate a parallel path here anyway. It
- * might not be worthwhile just for this relation, but when combined
- * with all of its inheritance siblings it may well pay off.
- */
- if (rel->pages < (BlockNumber) min_parallel_relation_size &&
- rel->reloptkind == RELOPT_BASEREL)
- return;
-
- /*
- * Select the number of workers based on the log of the size of the
- * relation. This probably needs to be a good deal more
- * sophisticated, but we need something here for now. Note that the
- * upper limit of the min_parallel_relation_size GUC is chosen to
- * prevent overflow here.
- */
- parallel_workers = 1;
- parallel_threshold = Max(min_parallel_relation_size, 1);
- while (rel->pages >= (BlockNumber) (parallel_threshold * 3))
- {
- parallel_workers++;
- parallel_threshold *= 3;
- if (parallel_threshold > INT_MAX / 3)
- break; /* avoid overflow */
- }
- }
-
- /*
- * In no case use more than max_parallel_workers_per_gather workers.
- */
- parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);
+ parallel_workers = compute_parallel_worker(rel, rel->pages);
/* If any limit was set to zero, the user doesn't want a parallel scan. */
if (parallel_workers <= 0)
@@ -2866,6 +2825,84 @@ remove_unused_subquery_outputs(Query *subquery, RelOptInfo *rel)
}
}
+/*
+ * create_partial_bitmap_paths
+ * Build partial access paths for parallel scan of a plain relation
+ */
+void
+create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+ Path *bitmapqual)
+{
+ int parallel_workers;
+ double pages_fetched;
+
+ /* Compute heap pages for bitmap heap scan */
+ pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
+ NULL, NULL);
+
+ parallel_workers = compute_parallel_worker(rel, pages_fetched);
+
+ /* If any limit was set to zero, the user doesn't want a parallel scan. */
+ if (parallel_workers <= 0)
+ return;
+
+ add_partial_path(rel, (Path*)create_bitmap_heap_path(root, rel,
+ bitmapqual, rel->lateral_relids, 1.0, parallel_workers));
+}
+
+static int
+compute_parallel_worker(RelOptInfo *rel, BlockNumber pages)
+{
+ int parallel_workers;
+
+ /*
+ * If the user has set the parallel_workers reloption, use that; otherwise
+ * select a default number of workers.
+ */
+ if (rel->rel_parallel_workers != -1)
+ parallel_workers = rel->rel_parallel_workers;
+ else
+ {
+ int parallel_threshold;
+
+ /*
+ * If this relation is too small to be worth a parallel scan, just
+ * return without doing anything ... unless it's an inheritance child.
+ * In that case, we want to generate a parallel path here anyway. It
+ * might not be worthwhile just for this relation, but when combined
+ * with all of its inheritance siblings it may well pay off.
+ */
+ if (pages < (BlockNumber) min_parallel_relation_size &&
+ rel->reloptkind == RELOPT_BASEREL)
+ return 0;
+
+ /*
+ * Select the number of workers based on the log of the size of the
+ * relation. This probably needs to be a good deal more
+ * sophisticated, but we need something here for now. Note that the
+ * upper limit of the min_parallel_relation_size GUC is chosen to
+ * prevent overflow here.
+ */
+ parallel_workers = 1;
+ parallel_threshold = Max(min_parallel_relation_size, 1);
+ while (pages >= (BlockNumber) (parallel_threshold * 3))
+ {
+ parallel_workers++;
+ parallel_threshold *= 3;
+ if (parallel_threshold > INT_MAX / 3)
+ break; /* avoid overflow */
+ }
+ }
+
+ /*
+ * In no case use more than max_parallel_workers_per_gather workers.
+ */
+ parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);
+
+ return parallel_workers;
+}
+
+
/*****************************************************************************
* DEBUG SUPPORT
*****************************************************************************/
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index e42895d..64bb8b6 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -161,7 +161,7 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root,
static void set_rel_width(PlannerInfo *root, RelOptInfo *rel);
static double relation_byte_size(double tuples, int width);
static double page_size(double tuples, int width);
-
+static Cost adjust_cost_for_parallelism(Path *path, Cost cpu_run_cost);
/*
* clamp_row_est
@@ -237,44 +237,7 @@ cost_seqscan(Path *path, PlannerInfo *root,
/* Adjust costing for parallelism, if used. */
if (path->parallel_workers > 0)
- {
- double parallel_divisor = path->parallel_workers;
- double leader_contribution;
-
- /*
- * Early experience with parallel query suggests that when there is
- * only one worker, the leader often makes a very substantial
- * contribution to executing the parallel portion of the plan, but as
- * more workers are added, it does less and less, because it's busy
- * reading tuples from the workers and doing whatever non-parallel
- * post-processing is needed. By the time we reach 4 workers, the
- * leader no longer makes a meaningful contribution. Thus, for now,
- * estimate that the leader spends 30% of its time servicing each
- * worker, and the remainder executing the parallel plan.
- */
- leader_contribution = 1.0 - (0.3 * path->parallel_workers);
- if (leader_contribution > 0)
- parallel_divisor += leader_contribution;
-
- /*
- * In the case of a parallel plan, the row count needs to represent
- * the number of tuples processed per worker. Otherwise, higher-level
- * plan nodes that appear below the gather will be costed incorrectly,
- * because they'll anticipate receiving more rows than any given copy
- * will actually get.
- */
- path->rows = clamp_row_est(path->rows / parallel_divisor);
-
- /* The CPU cost is divided among all the workers. */
- cpu_run_cost /= parallel_divisor;
-
- /*
- * It may be possible to amortize some of the I/O cost, but probably
- * not very much, because most operating systems already do aggressive
- * prefetching. For now, we assume that the disk run cost can't be
- * amortized at all.
- */
- }
+ cpu_run_cost = adjust_cost_for_parallelism(path, cpu_run_cost);
path->startup_cost = startup_cost;
path->total_cost = startup_cost + cpu_run_cost + disk_run_cost;
@@ -831,10 +794,10 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
Cost startup_cost = 0;
Cost run_cost = 0;
Cost indexTotalCost;
- Selectivity indexSelectivity;
QualCost qpqual_cost;
Cost cpu_per_tuple;
Cost cost_per_page;
+ Cost cpu_run_cost;
double tuples_fetched;
double pages_fetched;
double spc_seq_page_cost,
@@ -855,13 +818,12 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
if (!enable_bitmapscan)
startup_cost += disable_cost;
- /*
- * Fetch total cost of obtaining the bitmap, as well as its total
- * selectivity.
- */
- cost_bitmap_tree_node(bitmapqual, &indexTotalCost, &indexSelectivity);
+ pages_fetched = compute_bitmap_pages(root, baserel, bitmapqual,
+ loop_count, &indexTotalCost,
+ &tuples_fetched);
startup_cost += indexTotalCost;
+ T = (baserel->pages > 1) ? (double) baserel->pages : 1.0;
/* Fetch estimated page costs for tablespace containing table. */
get_tablespace_page_costs(baserel->reltablespace,
@@ -869,41 +831,6 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
&spc_seq_page_cost);
/*
- * Estimate number of main-table pages fetched.
- */
- tuples_fetched = clamp_row_est(indexSelectivity * baserel->tuples);
-
- T = (baserel->pages > 1) ? (double) baserel->pages : 1.0;
-
- if (loop_count > 1)
- {
- /*
- * For repeated bitmap scans, scale up the number of tuples fetched in
- * the Mackert and Lohman formula by the number of scans, so that we
- * estimate the number of pages fetched by all the scans. Then
- * pro-rate for one scan.
- */
- pages_fetched = index_pages_fetched(tuples_fetched * loop_count,
- baserel->pages,
- get_indexpath_pages(bitmapqual),
- root);
- pages_fetched /= loop_count;
- }
- else
- {
- /*
- * For a single scan, the number of heap pages that need to be fetched
- * is the same as the Mackert and Lohman formula for the case T <= b
- * (ie, no re-reads needed).
- */
- pages_fetched = (2.0 * T * tuples_fetched) / (2.0 * T + tuples_fetched);
- }
- if (pages_fetched >= T)
- pages_fetched = T;
- else
- pages_fetched = ceil(pages_fetched);
-
- /*
* For small numbers of pages we should charge spc_random_page_cost
* apiece, while if nearly all the table's pages are being read, it's more
* appropriate to charge spc_seq_page_cost apiece. The effect is
@@ -932,8 +859,13 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
startup_cost += qpqual_cost.startup;
cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple;
+ cpu_run_cost = cpu_per_tuple * tuples_fetched;
- run_cost += cpu_per_tuple * tuples_fetched;
+ /* Adjust costing for parallelism, if used. */
+ if (path->parallel_workers > 0)
+ cpu_run_cost = adjust_cost_for_parallelism(path, cpu_run_cost);
+
+ run_cost += cpu_run_cost;
/* tlist eval costs are paid per output row, not per tuple scanned */
startup_cost += path->pathtarget->cost.startup;
@@ -944,6 +876,56 @@ cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel,
}
/*
+ * adjust_cost_for_parallelism
+ *
+ * Adjust the cpu cost based on number of parallel workers, also update
+ * the number of rows processed at each worker.
+ */
+static Cost
+adjust_cost_for_parallelism(Path *path, Cost cpu_run_cost)
+{
+ double parallel_divisor = path->parallel_workers;
+ double leader_contribution;
+ Cost cpu_cost = cpu_run_cost;
+
+ /*
+ * Early experience with parallel query suggests that when there is
+ * only one worker, the leader often makes a very substantial
+ * contribution to executing the parallel portion of the plan, but as
+ * more workers are added, it does less and less, because it's busy
+ * reading tuples from the workers and doing whatever non-parallel
+ * post-processing is needed. By the time we reach 4 workers, the
+ * leader no longer makes a meaningful contribution. Thus, for now,
+ * estimate that the leader spends 30% of its time servicing each
+ * worker, and the remainder executing the parallel plan.
+ */
+ leader_contribution = 1.0 - (0.3 * path->parallel_workers);
+ if (leader_contribution > 0)
+ parallel_divisor += leader_contribution;
+
+ /*
+ * In the case of a parallel plan, the row count needs to represent
+ * the number of tuples processed per worker. Otherwise, higher-level
+ * plan nodes that appear below the gather will be costed incorrectly,
+ * because they'll anticipate receiving more rows than any given copy
+ * will actually get.
+ */
+ path->rows = clamp_row_est(path->rows / parallel_divisor);
+
+ /* The CPU cost is divided among all the workers. */
+ cpu_cost /= parallel_divisor;
+
+ /*
+ * It may be possible to amortize some of the I/O cost, but probably
+ * not very much, because most operating systems already do aggressive
+ * prefetching. For now, we assume that the disk run cost can't be
+ * amortized at all.
+ */
+
+ return cpu_cost;
+}
+
+/*
* cost_bitmap_tree_node
* Extract cost and selectivity from a bitmap tree node (index/and/or)
*/
@@ -4764,3 +4746,69 @@ page_size(double tuples, int width)
{
return ceil(relation_byte_size(tuples, width) / BLCKSZ);
}
+
+/*
+ * compute_bitmap_pages
+ *
+ * compute number of pages fetched from heap in bitmap heap scan.
+ */
+double
+compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel, Path *bitmapqual,
+ int loop_count, Cost *cost, double *tuple)
+{
+ Cost indexTotalCost;
+ Selectivity indexSelectivity;
+ double T;
+ double pages_fetched;
+ double tuples_fetched;
+
+ /*
+ * Fetch total cost of obtaining the bitmap, as well as its total
+ * selectivity.
+ */
+ cost_bitmap_tree_node(bitmapqual, &indexTotalCost, &indexSelectivity);
+
+ /*
+ * Estimate number of main-table pages fetched.
+ */
+ tuples_fetched = clamp_row_est(indexSelectivity * baserel->tuples);
+
+ T = (baserel->pages > 1) ? (double) baserel->pages : 1.0;
+
+ if (loop_count > 1)
+ {
+ /*
+ * For repeated bitmap scans, scale up the number of tuples fetched in
+ * the Mackert and Lohman formula by the number of scans, so that we
+ * estimate the number of pages fetched by all the scans. Then
+ * pro-rate for one scan.
+ */
+ pages_fetched = index_pages_fetched(tuples_fetched * loop_count,
+ baserel->pages,
+ get_indexpath_pages(bitmapqual),
+ root);
+ pages_fetched /= loop_count;
+ }
+ else
+ {
+ /*
+ * For a single scan, the number of heap pages that need to be fetched
+ * is the same as the Mackert and Lohman formula for the case T <= b
+ * (ie, no re-reads needed).
+ */
+ pages_fetched =
+ (2.0 * T * tuples_fetched) / (2.0 * T + tuples_fetched);
+ }
+
+ if (pages_fetched >= T)
+ pages_fetched = T;
+ else
+ pages_fetched = ceil(pages_fetched);
+
+ if (cost)
+ *cost = indexTotalCost;
+ if (tuple)
+ *tuple = tuples_fetched;
+
+ return pages_fetched;
+}
diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c
index 2952bfb..1a7bde0 100644
--- a/src/backend/optimizer/path/indxpath.c
+++ b/src/backend/optimizer/path/indxpath.c
@@ -337,8 +337,12 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
bitmapqual = choose_bitmap_and(root, rel, bitindexpaths);
bpath = create_bitmap_heap_path(root, rel, bitmapqual,
- rel->lateral_relids, 1.0);
+ rel->lateral_relids, 1.0, 0);
add_path(rel, (Path *) bpath);
+
+ /* create a partial bitmap heap path */
+ if (rel->consider_parallel && rel->lateral_relids == NULL)
+ create_partial_bitmap_paths(root, rel, bitmapqual);
}
/*
@@ -410,7 +414,7 @@ create_index_paths(PlannerInfo *root, RelOptInfo *rel)
required_outer = get_bitmap_tree_required_outer(bitmapqual);
loop_count = get_loop_count(root, rel->relid, required_outer);
bpath = create_bitmap_heap_path(root, rel, bitmapqual,
- required_outer, loop_count);
+ required_outer, loop_count, 0);
add_path(rel, (Path *) bpath);
}
}
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index ad49674..0b544a1 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -2803,7 +2803,7 @@ create_bitmap_subplan(PlannerInfo *root, Path *bitmapqual,
plan->plan_rows =
clamp_row_est(ipath->indexselectivity * ipath->path.parent->tuples);
plan->plan_width = 0; /* meaningless */
- plan->parallel_aware = false;
+ plan->parallel_aware = bitmapqual->parallel_aware;
*qual = get_actual_clauses(ipath->indexclauses);
*indexqual = get_actual_clauses(ipath->indexquals);
foreach(l, ipath->indexinfo->indpred)
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 6d3ccfd..00b5c01 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -1071,7 +1071,8 @@ create_bitmap_heap_path(PlannerInfo *root,
RelOptInfo *rel,
Path *bitmapqual,
Relids required_outer,
- double loop_count)
+ double loop_count,
+ int parallel_degree)
{
BitmapHeapPath *pathnode = makeNode(BitmapHeapPath);
@@ -1080,11 +1081,10 @@ create_bitmap_heap_path(PlannerInfo *root,
pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
required_outer);
- pathnode->path.parallel_aware = false;
+ pathnode->path.parallel_aware = parallel_degree > 0 ? true : false;
pathnode->path.parallel_safe = rel->consider_parallel;
- pathnode->path.parallel_workers = 0;
+ pathnode->path.parallel_workers = parallel_degree;
pathnode->path.pathkeys = NIL; /* always unordered */
-
pathnode->bitmapqual = bitmapqual;
cost_bitmap_heap_scan(&pathnode->path, root, rel,
@@ -3192,7 +3192,7 @@ reparameterize_path(PlannerInfo *root, Path *path,
rel,
bpath->bitmapqual,
required_outer,
- loop_count);
+ loop_count, 0);
}
case T_SubqueryScan:
{
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index a392197..80c80ab 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3387,6 +3387,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PARALLEL_FINISH:
event_name = "ParallelFinish";
break;
+ case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
+ event_name = "ParallelBitmapScan";
+ break;
case WAIT_EVENT_SAFE_SNAPSHOT:
event_name = "SafeSnapshot";
break;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 0d12bbb..ad9c693 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -178,6 +178,7 @@ extern void simple_heap_update(Relation relation, ItemPointer otid,
HeapTuple tup);
extern void heap_sync(Relation relation);
+extern void heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot);
/* in heap/pruneheap.c */
extern void heap_page_prune_opt(Relation relation, Buffer buffer);
diff --git a/src/include/executor/nodeBitmapHeapscan.h b/src/include/executor/nodeBitmapHeapscan.h
index 0ed9c78..8afecb7 100644
--- a/src/include/executor/nodeBitmapHeapscan.h
+++ b/src/include/executor/nodeBitmapHeapscan.h
@@ -15,10 +15,17 @@
#define NODEBITMAPHEAPSCAN_H
#include "nodes/execnodes.h"
+#include "access/parallel.h"
extern BitmapHeapScanState *ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecBitmapHeapScan(BitmapHeapScanState *node);
extern void ExecEndBitmapHeapScan(BitmapHeapScanState *node);
extern void ExecReScanBitmapHeapScan(BitmapHeapScanState *node);
+extern void ExecBitmapHeapEstimate(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeDSM(BitmapHeapScanState *node,
+ ParallelContext *pcxt);
+extern void ExecBitmapHeapInitializeWorker(BitmapHeapScanState *node,
+ shm_toc *toc);
#endif /* NODEBITMAPHEAPSCAN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 7967383..e1cc891 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -26,6 +26,8 @@
#include "utils/sortsupport.h"
#include "utils/tuplestore.h"
#include "utils/tuplesort.h"
+#include "nodes/tidbitmap.h"
+#include "storage/condition_variable.h"
/* ----------------
@@ -1399,6 +1401,72 @@ typedef struct IndexOnlyScanState
long ioss_HeapFetches;
} IndexOnlyScanState;
+/*
+ * Stores the information about current position of the
+ * shared iterator used for parallel bitmap heap scan.
+ */
+typedef struct
+{
+ slock_t mutex; /* mutual exclusion for below three fields */
+ int spageptr; /* next spages index */
+ int schunkptr; /* next schunks index */
+ int schunkbit; /* next bit to check in current schunk */
+} ParallelIterator;
+
+/* ----------------
+ * PBMState information : Current status of the TIDBitmap creation during
+ * parallel bitmap heap scan.
+ *
+ * PBM_INITIAL TIDBitmap creation is not yet started, so
+ * first worker to see this state will become
+ * leader and will create TIDbitmap. This will
+ * also set the state to PBM_INPROGRESS.
+ * PBM_INPROGRESS TIDBitmap creation is already in progress,
+ * so workers need to sleep until leader set the
+ * state to PBM_FINISHED and wake us up.
+ * PBM_FINISHED TIDBitmap creation is done, so now all worker
+ * can proceed to iterate over TIDBitmap.
+ * ----------------
+ */
+typedef enum
+{
+ PBM_INITIAL,
+ PBM_INPROGRESS,
+ PBM_FINISHED
+}PBMState;
+
+/* ----------------
+ * ParallelBitmapInfo information
+ * relid OID of relation to scan
+ * tbmiterator main iterator
+ * prefetch_mutex mutual exclusion for prefetch members
+ * (prefetch_iterator, prefetch_pages and
+ * prefetch_target)
+ * prefetch_iterator iterator for scanning ahead of current pages
+ * prefetch_pages # pages prefetch iterator is ahead of current
+ * prefetch_target current target prefetch distance
+ * state_mutex mutual exclusion for state field
+ * cv conditional wait variable
+ * state current state of the TIDBitmap
+ * ptbm_offset offset in bytes of ParallelTIDBitmap.
+ * phs_snapshot_data snapshot data shared to worker
+ * ----------------
+ */
+typedef struct ParallelBitmapInfo
+{
+ Oid relid;
+ ParallelIterator tbmiterator;
+ ParallelIterator prefetch_iterator;
+ slock_t prefetch_mutex;
+ int prefetch_pages;
+ int prefetch_target;
+ slock_t state_mutex;
+ ConditionVariable cv;
+ PBMState state;
+ Size ptbm_offset;
+ char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+}ParallelBitmapInfo;
+
/* ----------------
* BitmapIndexScanState information
*
@@ -1413,6 +1481,8 @@ typedef struct IndexOnlyScanState
* RuntimeContext expr context for evaling runtime Skeys
* RelationDesc index relation descriptor
* ScanDesc index scan descriptor
+ * Parallel Under parallel Bitmap heap so need to create shared
+ * TIDBitmap
* ----------------
*/
typedef struct BitmapIndexScanState
@@ -1429,6 +1499,7 @@ typedef struct BitmapIndexScanState
ExprContext *biss_RuntimeContext;
Relation biss_RelationDesc;
IndexScanDesc biss_ScanDesc;
+ bool biss_Parallel;
} BitmapIndexScanState;
/* ----------------
@@ -1444,7 +1515,11 @@ typedef struct BitmapIndexScanState
* prefetch_pages # pages prefetch iterator is ahead of current
* prefetch_target current target prefetch distance
* prefetch_maximum maximum value for prefetch_target
- * ----------------
+ * pscan_len size of the shared memory for parallel bitmap
+ * is_leader is_leader is set true, if this worker become
+ * leader for parallel bitmap heap scan.
+ * parallel_bitmap shared memory for parallel bitmap scan
+ *----------------
*/
typedef struct BitmapHeapScanState
{
@@ -1459,6 +1534,9 @@ typedef struct BitmapHeapScanState
int prefetch_pages;
int prefetch_target;
int prefetch_maximum;
+ Size pscan_len;
+ bool is_leader;
+ ParallelBitmapInfo *parallel_bitmap;
} BitmapHeapScanState;
/* ----------------
diff --git a/src/include/nodes/tidbitmap.h b/src/include/nodes/tidbitmap.h
index 9303299..94e7756 100644
--- a/src/include/nodes/tidbitmap.h
+++ b/src/include/nodes/tidbitmap.h
@@ -31,9 +31,6 @@
*/
typedef struct TIDBitmap TIDBitmap;
-/* Likewise, TBMIterator is private */
-typedef struct TBMIterator TBMIterator;
-
/* Result structure for tbm_iterate */
typedef struct
{
@@ -44,6 +41,26 @@ typedef struct
OffsetNumber offsets[FLEXIBLE_ARRAY_MEMBER];
} TBMIterateResult;
+/*
+ * When iterating over a bitmap in sorted order, a TBMIterator is used to
+ * track our progress. There can be several iterators scanning the same
+ * bitmap concurrently. Note that the bitmap becomes read-only as soon as
+ * any iterator is created.
+ */
+typedef struct
+{
+ TIDBitmap *tbm; /* TIDBitmap we're iterating over */
+ int spageptr; /* next spages index */
+ int schunkptr; /* next schunks index */
+ int schunkbit; /* next bit to check in current schunk */
+ TBMIterateResult output; /* MUST BE LAST (because variable-size) */
+}TBMIterator;
+
+/*
+ * Holds shared members of TIDBitmap for parallel bitmap scan.
+ */
+typedef struct ParallelTIDBitmap ParallelTIDBitmap;
+
/* function prototypes in nodes/tidbitmap.c */
extern TIDBitmap *tbm_create(long maxbytes);
@@ -58,9 +75,14 @@ extern void tbm_union(TIDBitmap *a, const TIDBitmap *b);
extern void tbm_intersect(TIDBitmap *a, const TIDBitmap *b);
extern bool tbm_is_empty(const TIDBitmap *tbm);
-
-extern TBMIterator *tbm_begin_iterate(TIDBitmap *tbm);
+extern TBMIterator * tbm_begin_iterate(TIDBitmap *tbm);
extern TBMIterateResult *tbm_iterate(TBMIterator *iterator);
extern void tbm_end_iterate(TBMIterator *iterator);
+extern void tbm_restore_shared_members(TIDBitmap *tbm,
+ ParallelTIDBitmap *stbm);
+extern void tbm_update_shared_members(TIDBitmap *tbm,
+ ParallelTIDBitmap *parallel_tbm);
+void tbm_set_parallel(TIDBitmap *tbm, void *area);
+extern Size tbm_estimate_parallel_tidbitmap(Size size);
#endif /* TIDBITMAP_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 2a4df2f..e1baacc 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -183,6 +183,8 @@ extern void set_cte_size_estimates(PlannerInfo *root, RelOptInfo *rel,
double cte_rows);
extern void set_foreign_size_estimates(PlannerInfo *root, RelOptInfo *rel);
extern PathTarget *set_pathtarget_cost_width(PlannerInfo *root, PathTarget *target);
+extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel,
+ Path *bitmapqual, int loop_count, Cost *cost, double *tuple);
/*
* prototypes for clausesel.c
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index 71d9154..397d266 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -52,7 +52,8 @@ extern BitmapHeapPath *create_bitmap_heap_path(PlannerInfo *root,
RelOptInfo *rel,
Path *bitmapqual,
Relids required_outer,
- double loop_count);
+ double loop_count,
+ int parallel_degree);
extern BitmapAndPath *create_bitmap_and_path(PlannerInfo *root,
RelOptInfo *rel,
List *bitmapquals);
diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h
index 44abe83..3264f44 100644
--- a/src/include/optimizer/paths.h
+++ b/src/include/optimizer/paths.h
@@ -53,6 +53,8 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,
List *initial_rels);
extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
+extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
+ Path *bitmapqual);
#ifdef OPTIMIZER_DEBUG
extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0b85b7a..07ea852 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -784,6 +784,7 @@ typedef enum
WAIT_EVENT_MQ_RECEIVE,
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
+ WAIT_EVENT_PARALLEL_BITMAP_SCAN,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP
} WaitEventIPC;