diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5b67def..3b6a6dc 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1207,7 +1207,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting in an extension.
- IPC>
+ IPC>
BgWorkerShutdown>
Waiting for background worker to shut down.
@@ -1240,6 +1240,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for parallel workers to finish computing.
+ ParallelBtreePage>
+ Waiting for the page number needed to continue a parallel btree scan
+ to become available.
+
+
SafeSnapshot>
Waiting for a snapshot for a READ ONLY DEFERRABLE> transaction.
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 945e563..b707fe2 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -23,6 +23,8 @@
#include "access/xlog.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
+#include "pgstat.h"
+#include "storage/condition_variable.h"
#include "storage/indexfsm.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -63,6 +65,44 @@ typedef struct
MemoryContext pagedelcontext;
} BTVacState;
+/*
+ * BTPARALLEL_NOT_INITIALIZED implies that the scan is not started.
+ *
+ * BTPARALLEL_ADVANCING implies one of the worker or backend is advancing the
+ * scan to a new page; others must wait.
+ *
+ * BTPARALLEL_IDLE implies that no backend is advancing the scan; someone can
+ * start doing it.
+ *
+ * BTPARALLEL_DONE implies that the scan is complete (including error exit).
+ */
+typedef enum
+{
+ BTPARALLEL_NOT_INITIALIZED,
+ BTPARALLEL_ADVANCING,
+ BTPARALLEL_IDLE,
+ BTPARALLEL_DONE
+} BTPS_State;
+
+/*
+ * BTParallelScanDescData contains btree specific shared information required
+ * for parallel scan.
+ */
+typedef struct BTParallelScanDescData
+{
+ BlockNumber btps_scanPage; /* latest or next page to be scanned */
+ BTPS_State btps_pageStatus;/* indicates whether next page is available
+ * for scan. see above for possible states of
+ * parallel scan. */
+ int btps_arrayKeyCount; /* count indicating number of array
+ * scan keys processed by parallel
+ * scan */
+ slock_t btps_mutex; /* protects above variables */
+ ConditionVariable btps_cv; /* used to synchronize parallel scan */
+} BTParallelScanDescData;
+
+typedef struct BTParallelScanDescData *BTParallelScanDesc;
+
static void btbuildCallback(Relation index,
HeapTuple htup,
@@ -118,9 +158,9 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amendscan = btendscan;
amroutine->ammarkpos = btmarkpos;
amroutine->amrestrpos = btrestrpos;
- amroutine->amestimateparallelscan = NULL;
- amroutine->aminitparallelscan = NULL;
- amroutine->amparallelrescan = NULL;
+ amroutine->amestimateparallelscan = btestimateparallelscan;
+ amroutine->aminitparallelscan = btinitparallelscan;
+ amroutine->amparallelrescan = btparallelrescan;
PG_RETURN_POINTER(amroutine);
}
@@ -491,6 +531,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
}
so->markItemIndex = -1;
+ so->arrayKeyCount = 0;
BTScanPosUnpinIfPinned(so->markPos);
BTScanPosInvalidate(so->markPos);
@@ -653,6 +694,215 @@ btrestrpos(IndexScanDesc scan)
}
/*
+ * btestimateparallelscan -- estimate storage for BTParallelScanDescData
+ */
+Size
+btestimateparallelscan(void)
+{
+ return sizeof(BTParallelScanDescData);
+}
+
+/*
+ * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan
+ */
+void
+btinitparallelscan(void *target)
+{
+ BTParallelScanDesc bt_target = (BTParallelScanDesc) target;
+
+ SpinLockInit(&bt_target->btps_mutex);
+ bt_target->btps_scanPage = InvalidBlockNumber;
+ bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ bt_target->btps_arrayKeyCount = 0;
+ ConditionVariableInit(&bt_target->btps_cv);
+}
+
+/*
+ * btparallelrescan() -- reset parallel scan
+ */
+void
+btparallelrescan(IndexScanDesc scan)
+{
+ BTParallelScanDesc btscan;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+
+ Assert(parallel_scan);
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ /*
+ * In theory, we don't need to acquire the spinlock here, because there
+ * shouldn't be any other workers running at this point, but we do so for
+ * consistency.
+ */
+ SpinLockAcquire(&btscan->btps_mutex);
+ btscan->btps_scanPage = InvalidBlockNumber;
+ btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ btscan->btps_arrayKeyCount = 0;
+ SpinLockRelease(&btscan->btps_mutex);
+}
+
+/*
+ * _bt_parallel_seize() -- Begin the process of advancing the scan to a new
+ * page. Other scans must wait until we call bt_parallel_release() or
+ * bt_parallel_done().
+ *
+ * The return value tells caller whether to continue the scan or not. The
+ * true value indicates either one of the following (a) the block number
+ * returned is valid and the scan can be continued (b) the block number is
+ * invalid and the scan has just begun (c) the block number is P_NONE and the
+ * scan is finished. The false value indicates that we have reached the end
+ * of scan for current scankeys and for that we return block number as P_NONE.
+ *
+ * The first time master backend or worker hits last page, it will return
+ * P_NONE and status as 'True', after that any worker tries to fetch next
+ * page, it will return status as 'False'.
+ *
+ * Callers ignore the value of pageno, if false is returned.
+ */
+bool
+_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ BTPS_State pageStatus;
+ bool exit_loop = false;
+ bool status = true;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ while (1)
+ {
+ /*
+ * Fetch the next block to scan and update the page status so that
+ * other participants of parallel scan can wait till next page is
+ * available for scan. Set the status as false, if scan is finished.
+ */
+ SpinLockAcquire(&btscan->btps_mutex);
+ pageStatus = btscan->btps_pageStatus;
+
+ /* Check if the scan for current scan keys is finished */
+ if (so->arrayKeyCount < btscan->btps_arrayKeyCount)
+ status = false;
+ else if (pageStatus == BTPARALLEL_DONE)
+ status = false;
+ else if (pageStatus != BTPARALLEL_ADVANCING)
+ {
+ btscan->btps_pageStatus = BTPARALLEL_ADVANCING;
+ *pageno = btscan->btps_scanPage;
+ exit_loop = true;
+ }
+ SpinLockRelease(&btscan->btps_mutex);
+ if (exit_loop || !status)
+ break;
+ ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE);
+ }
+ ConditionVariableCancelSleep();
+
+ /* no more pages to scan */
+ if (!status)
+ *pageno = P_NONE;
+
+ return status;
+}
+
+/*
+ * _bt_parallel_release() -- Complete the process of advancing the scan to a
+ * new page. We now have the new value btps_scanPage; some other backend
+ * can now begin advancing the scan.
+ *
+ * scan_page - For backward scans, it holds the latest page being scanned
+ * and for forward scans, it holds the next page to be scanned.
+ */
+void
+_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page)
+{
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ SpinLockAcquire(&btscan->btps_mutex);
+ btscan->btps_scanPage = scan_page;
+ btscan->btps_pageStatus = BTPARALLEL_IDLE;
+ SpinLockRelease(&btscan->btps_mutex);
+ ConditionVariableSignal(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_done() -- Mark the parallel scan as complete.
+ *
+ * When there are no pages left to scan, this function should be called to
+ * notify other workers. Otherwise, they might wait forever for the scan to
+ * advance to the next page.
+ */
+void
+_bt_parallel_done(IndexScanDesc scan)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+ bool status_changed = false;
+
+ /* Do nothing, for non-parallel scans */
+ if (parallel_scan == NULL)
+ return;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ /*
+ * Ensure to mark parallel scan as done no more than once for single scan.
+ * We rely on this state to initiate the next scan for multiple array
+ * keys, see _bt_advance_array_keys.
+ */
+ SpinLockAcquire(&btscan->btps_mutex);
+ if (so->arrayKeyCount >= btscan->btps_arrayKeyCount &&
+ btscan->btps_pageStatus != BTPARALLEL_DONE)
+ {
+ btscan->btps_pageStatus = BTPARALLEL_DONE;
+ status_changed = true;
+ }
+ SpinLockRelease(&btscan->btps_mutex);
+
+ /* wake up all the workers associated with this parallel scan */
+ if (status_changed)
+ ConditionVariableBroadcast(&btscan->btps_cv);
+}
+
+/*
+ * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array
+ * keys.
+ *
+ * Updates the count of array keys processed for both local and parallel
+ * scans.
+ */
+void
+_bt_parallel_advance_array_keys(IndexScanDesc scan)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ ParallelIndexScanDesc parallel_scan = scan->parallel_scan;
+ BTParallelScanDesc btscan;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan,
+ parallel_scan->ps_offset);
+
+ so->arrayKeyCount++;
+ SpinLockAcquire(&btscan->btps_mutex);
+ if (btscan->btps_pageStatus == BTPARALLEL_DONE)
+ {
+ btscan->btps_scanPage = InvalidBlockNumber;
+ btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ btscan->btps_arrayKeyCount++;
+ }
+ SpinLockRelease(&btscan->btps_mutex);
+}
+
+/*
* Bulk deletion of all index entries pointing to a set of heap tuples.
* The set of target tuples is specified via a callback routine that tells
* whether any given heap tuple (identified by ItemPointer) is being deleted.
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index b6459d2..a211c35 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -30,9 +30,12 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
static void _bt_saveitem(BTScanOpaque so, int itemIndex,
OffsetNumber offnum, IndexTuple itup);
static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
+static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
+static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
+static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir);
/*
@@ -544,8 +547,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
ScanKeyData notnullkeys[INDEX_MAX_KEYS];
int keysCount = 0;
int i;
+ bool status = true;
StrategyNumber strat_total;
BTScanPosItem *currItem;
+ BlockNumber blkno;
Assert(!BTScanPosIsValid(so->currPos));
@@ -564,6 +569,38 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
if (!so->qual_ok)
return false;
+ /*
+ * For parallel scans, get the page from shared state. If scan has not
+ * started, proceed to find out first leaf page to scan by keeping other
+ * workers waiting until we have descended to appropriate leaf page to be
+ * scanned for matching tuples.
+ *
+ * If the scan has already begun, skip finding the first leaf page and
+ * directly scanning the page stored in shared structure or the page to
+ * its left in case of backward scan.
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ else if (blkno == P_NONE)
+ {
+ _bt_parallel_done(scan);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ else if (blkno != InvalidBlockNumber)
+ {
+ if (!_bt_parallel_readpage(scan, blkno, dir))
+ return false;
+ goto readcomplete;
+ }
+ }
+
/*----------
* Examine the scan keys to discover where we need to start the scan.
*
@@ -743,7 +780,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* there.
*/
if (keysCount == 0)
- return _bt_endpoint(scan, dir);
+ {
+ bool match;
+
+ match = _bt_endpoint(scan, dir);
+ if (!match)
+ {
+ /* No match , indicate (parallel) scan finished */
+ _bt_parallel_done(scan);
+ BTScanPosInvalidate(so->currPos);
+ }
+
+ return match;
+ }
/*
* We want to start the scan somewhere within the index. Set up an
@@ -993,25 +1042,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* because nothing finer to lock exists.
*/
PredicateLockRelation(rel, scan->xs_snapshot);
+
+ /*
+ * mark parallel scan as done, so that all the workers can finish
+ * their scan
+ */
+ _bt_parallel_done(scan);
+ BTScanPosInvalidate(so->currPos);
+
return false;
}
else
PredicateLockPage(rel, BufferGetBlockNumber(buf),
scan->xs_snapshot);
- /* initialize moreLeft/moreRight appropriately for scan direction */
- if (ScanDirectionIsForward(dir))
- {
- so->currPos.moreLeft = false;
- so->currPos.moreRight = true;
- }
- else
- {
- so->currPos.moreLeft = true;
- so->currPos.moreRight = false;
- }
- so->numKilled = 0; /* just paranoia */
- Assert(so->markItemIndex == -1);
+ _bt_initialize_more_data(so, dir);
/* position to the precise item on the page */
offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
@@ -1060,6 +1105,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
_bt_drop_lock_and_maybe_pin(scan, &so->currPos);
}
+readcomplete:
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
@@ -1132,6 +1178,10 @@ _bt_next(IndexScanDesc scan, ScanDirection dir)
* moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports
* that there can be no more matching tuples in the current scan direction.
*
+ * In the case of parallel scans, caller must have called _bt_parallel_seize
+ * and _bt_parallel_release will be called in this function to advance the
+ * parallel scan.
+ *
* Returns true if any matching items found on the page, false if none.
*/
static bool
@@ -1154,6 +1204,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
page = BufferGetPage(so->currPos.buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /* allow next page be processed by parallel worker */
+ if (scan->parallel_scan)
+ {
+ if (ScanDirectionIsForward(dir))
+ _bt_parallel_release(scan, opaque->btpo_next);
+ else
+ _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf));
+ }
+
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
@@ -1278,21 +1338,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
* if pinned, we'll drop the pin before moving to next page. The buffer is
* not locked on entry.
*
- * On success exit, so->currPos is updated to contain data from the next
- * interesting page. For success on a scan using a non-MVCC snapshot we hold
- * a pin, but not a read lock, on that page. If we do not hold the pin, we
- * set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success.
- *
- * If there are no more matching records in the given direction, we drop all
- * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ * For success on a scan using a non-MVCC snapshot we hold a pin, but not a
+ * read lock, on that page. If we do not hold the pin, we set so->currPos.buf
+ * to InvalidBuffer. We return TRUE to indicate success.
*/
static bool
_bt_steppage(IndexScanDesc scan, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
- Relation rel;
- Page page;
- BTPageOpaque opaque;
+ BlockNumber blkno = InvalidBlockNumber;
+ bool status = true;
Assert(BTScanPosIsValid(so->currPos));
@@ -1319,13 +1374,27 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
so->markItemIndex = -1;
}
- rel = scan->indexRelation;
-
if (ScanDirectionIsForward(dir))
{
/* Walk right to the next page with data */
- /* We must rely on the previously saved nextPage link! */
- BlockNumber blkno = so->currPos.nextPage;
+
+ /*
+ * We must rely on the previously saved nextPage link for non-parallel
+ * scans!
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ {
+ /* release the previous buffer, if pinned */
+ BTScanPosUnpinIfPinned(so->currPos);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+ else
+ blkno = so->currPos.nextPage;
/* Remember we left a page with data */
so->currPos.moreLeft = true;
@@ -1333,11 +1402,72 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
/* release the previous buffer, if pinned */
BTScanPosUnpinIfPinned(so->currPos);
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+ }
+ else
+ {
+ /* Remember we left a page with data */
+ so->currPos.moreRight = true;
+
+ /* For parallel scans, get the last page scanned */
+ if (scan->parallel_scan != NULL)
+ {
+ status = _bt_parallel_seize(scan, &blkno);
+ BTScanPosUnpinIfPinned(so->currPos);
+ if (!status)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+ }
+
+ /* Drop the lock, and maybe the pin, on the current page */
+ _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
+
+ return true;
+}
+
+/*
+ * _bt_readnextpage() -- Read next page containing valid data for scan
+ *
+ * On success exit, so->currPos is updated to contain data from the next
+ * interesting page. Caller is responsible to release lock and pin on
+ * buffer on success. We return TRUE to indicate success.
+ *
+ * If there are no more matching records in the given direction, we drop all
+ * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ *
+ * Callers pass valid blkno for forward and backward scans with an exception
+ * for backward scans in which case so->currPos is expected to contain a valid
+ * value.
+ */
+static bool
+_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ Relation rel;
+ Page page;
+ BTPageOpaque opaque;
+ bool status = true;
+
+ rel = scan->indexRelation;
+
+ if (ScanDirectionIsForward(dir))
+ {
for (;;)
{
- /* if we're at end of scan, give up */
+ /*
+ * if we're at end of scan, give up and mark parallel scan as
+ * done, so that all the workers can finish their scan
+ */
if (blkno == P_NONE || !so->currPos.moreRight)
{
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
@@ -1359,14 +1489,30 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
}
/* nope, keep going */
- blkno = opaque->btpo_next;
+ if (scan->parallel_scan != NULL)
+ {
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ {
+ _bt_relbuf(rel, so->currPos.buf);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+ else
+ blkno = opaque->btpo_next;
_bt_relbuf(rel, so->currPos.buf);
}
}
else
{
- /* Remember we left a page with data */
- so->currPos.moreRight = true;
+ /*
+ * for parallel scans, current block number needs to be retrieved from
+ * shared state and it is the responsibility of caller to pass the
+ * correct block number.
+ */
+ if (blkno != InvalidBlockNumber)
+ so->currPos.currPage = blkno;
/*
* Walk left to the next page with data. This is much more complex
@@ -1401,6 +1547,12 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
if (!so->currPos.moreLeft)
{
_bt_relbuf(rel, so->currPos.buf);
+
+ /*
+ * mark parallel scan as done, so that all the workers can
+ * finish their scan
+ */
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
@@ -1412,6 +1564,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
/* if we're physically at end of index, return failure */
if (so->currPos.buf == InvalidBuffer)
{
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
@@ -1432,9 +1585,46 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
break;
}
+
+ /*
+ * For parallel scans, get the last page scanned as it is quite
+ * possible that by the time we try to fetch previous page, other
+ * worker has also decided to scan that previous page. So we
+ * can't rely on _bt_walk_left call.
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ _bt_relbuf(rel, so->currPos.buf);
+ status = _bt_parallel_seize(scan, &blkno);
+ if (!status)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
+ }
}
}
+ return true;
+}
+
+/*
+ * _bt_parallel_readpage() -- Read current page containing valid data for scan
+ *
+ * On success, release lock and pin on buffer. We return TRUE to indicate
+ * success.
+ */
+static bool
+_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+
+ _bt_initialize_more_data(so, dir);
+
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+
/* Drop the lock, and maybe the pin, on the current page */
_bt_drop_lock_and_maybe_pin(scan, &so->currPos);
@@ -1712,19 +1902,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
/* remember which buffer we have pinned */
so->currPos.buf = buf;
- /* initialize moreLeft/moreRight appropriately for scan direction */
- if (ScanDirectionIsForward(dir))
- {
- so->currPos.moreLeft = false;
- so->currPos.moreRight = true;
- }
- else
- {
- so->currPos.moreLeft = true;
- so->currPos.moreRight = false;
- }
- so->numKilled = 0; /* just paranoia */
- so->markItemIndex = -1; /* ditto */
+ _bt_initialize_more_data(so, dir);
/*
* Now load data from the first page of the scan.
@@ -1753,3 +1931,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
return true;
}
+
+/*
+ * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately
+ * for scan direction
+ */
+static inline void
+_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir)
+{
+ /* initialize moreLeft/moreRight appropriately for scan direction */
+ if (ScanDirectionIsForward(dir))
+ {
+ so->currPos.moreLeft = false;
+ so->currPos.moreRight = true;
+ }
+ else
+ {
+ so->currPos.moreLeft = true;
+ so->currPos.moreRight = false;
+ }
+ so->numKilled = 0; /* just paranoia */
+ so->markItemIndex = -1; /* ditto */
+}
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index da0f330..5b259a3 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
break;
}
+ /* advance parallel scan */
+ if (scan->parallel_scan != NULL)
+ _bt_parallel_advance_array_keys(scan);
+
return found;
}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 7176cf1..92af6ec 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3392,6 +3392,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PARALLEL_FINISH:
event_name = "ParallelFinish";
break;
+ case WAIT_EVENT_BTREE_PAGE:
+ event_name = "ParallelBtreePage";
+ break;
case WAIT_EVENT_SAFE_SNAPSHOT:
event_name = "SafeSnapshot";
break;
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index 344ef99..4c01a2f 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -609,6 +609,8 @@ typedef struct BTScanOpaqueData
ScanKey arrayKeyData; /* modified copy of scan->keyData */
int numArrayKeys; /* number of equality-type array keys (-1 if
* there are any unsatisfiable array keys) */
+ int arrayKeyCount; /* count indicating number of array scan keys
+ * processed */
BTArrayKeyInfo *arrayKeys; /* info about each equality-type array key */
MemoryContext arrayContext; /* scan-lifespan context for array data */
@@ -652,7 +654,8 @@ typedef BTScanOpaqueData *BTScanOpaque;
#define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
/*
- * prototypes for functions in nbtree.c (external entry points for btree)
+ * prototypes for functions in nbtree.c (external entry points for btree and
+ * functions to maintain state of parallel scan)
*/
extern IndexBuildResult *btbuild(Relation heap, Relation index,
struct IndexInfo *indexInfo);
@@ -662,10 +665,17 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull,
IndexUniqueCheck checkUnique,
struct IndexInfo *indexInfo);
extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys);
+extern Size btestimateparallelscan(void);
+extern void btinitparallelscan(void *target);
+extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno);
+extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page);
+extern void _bt_parallel_done(IndexScanDesc scan);
+extern void _bt_parallel_advance_array_keys(IndexScanDesc scan);
extern bool btgettuple(IndexScanDesc scan, ScanDirection dir);
extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm);
extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys);
+extern void btparallelrescan(IndexScanDesc scan);
extern void btendscan(IndexScanDesc scan);
extern void btmarkpos(IndexScanDesc scan);
extern void btrestrpos(IndexScanDesc scan);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index de8225b..915cc85 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -786,6 +786,7 @@ typedef enum
WAIT_EVENT_MQ_RECEIVE,
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
+ WAIT_EVENT_BTREE_PAGE,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP
} WaitEventIPC;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index c4235ae..9f876ae 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -161,6 +161,9 @@ BTPageOpaque
BTPageOpaqueData
BTPageStat
BTPageState
+BTParallelScanDesc
+BTParallelScanDescData
+BTPS_State
BTScanOpaque
BTScanOpaqueData
BTScanPos