diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5b67def..3b6a6dc 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1207,7 +1207,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting in an extension. - IPC + IPC BgWorkerShutdown Waiting for background worker to shut down. @@ -1240,6 +1240,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for parallel workers to finish computing. + ParallelBtreePage + Waiting for the page number needed to continue a parallel btree scan + to become available. + + SafeSnapshot Waiting for a snapshot for a READ ONLY DEFERRABLE transaction. diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 945e563..b707fe2 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -23,6 +23,8 @@ #include "access/xlog.h" #include "catalog/index.h" #include "commands/vacuum.h" +#include "pgstat.h" +#include "storage/condition_variable.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -63,6 +65,44 @@ typedef struct MemoryContext pagedelcontext; } BTVacState; +/* + * BTPARALLEL_NOT_INITIALIZED implies that the scan is not started. + * + * BTPARALLEL_ADVANCING implies one of the worker or backend is advancing the + * scan to a new page; others must wait. + * + * BTPARALLEL_IDLE implies that no backend is advancing the scan; someone can + * start doing it. + * + * BTPARALLEL_DONE implies that the scan is complete (including error exit). + */ +typedef enum +{ + BTPARALLEL_NOT_INITIALIZED, + BTPARALLEL_ADVANCING, + BTPARALLEL_IDLE, + BTPARALLEL_DONE +} BTPS_State; + +/* + * BTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct BTParallelScanDescData +{ + BlockNumber btps_scanPage; /* latest or next page to be scanned */ + BTPS_State btps_pageStatus;/* indicates whether next page is available + * for scan. see above for possible states of + * parallel scan. */ + int btps_arrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ + slock_t btps_mutex; /* protects above variables */ + ConditionVariable btps_cv; /* used to synchronize parallel scan */ +} BTParallelScanDescData; + +typedef struct BTParallelScanDescData *BTParallelScanDesc; + static void btbuildCallback(Relation index, HeapTuple htup, @@ -118,9 +158,9 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amendscan = btendscan; amroutine->ammarkpos = btmarkpos; amroutine->amrestrpos = btrestrpos; - amroutine->amestimateparallelscan = NULL; - amroutine->aminitparallelscan = NULL; - amroutine->amparallelrescan = NULL; + amroutine->amestimateparallelscan = btestimateparallelscan; + amroutine->aminitparallelscan = btinitparallelscan; + amroutine->amparallelrescan = btparallelrescan; PG_RETURN_POINTER(amroutine); } @@ -491,6 +531,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, } so->markItemIndex = -1; + so->arrayKeyCount = 0; BTScanPosUnpinIfPinned(so->markPos); BTScanPosInvalidate(so->markPos); @@ -653,6 +694,215 @@ btrestrpos(IndexScanDesc scan) } /* + * btestimateparallelscan -- estimate storage for BTParallelScanDescData + */ +Size +btestimateparallelscan(void) +{ + return sizeof(BTParallelScanDescData); +} + +/* + * btinitparallelscan -- initialize BTParallelScanDesc for parallel btree scan + */ +void +btinitparallelscan(void *target) +{ + BTParallelScanDesc bt_target = (BTParallelScanDesc) target; + + SpinLockInit(&bt_target->btps_mutex); + bt_target->btps_scanPage = InvalidBlockNumber; + bt_target->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + bt_target->btps_arrayKeyCount = 0; + ConditionVariableInit(&bt_target->btps_cv); +} + +/* + * btparallelrescan() -- reset parallel scan + */ +void +btparallelrescan(IndexScanDesc scan) +{ + BTParallelScanDesc btscan; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + + Assert(parallel_scan); + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + /* + * In theory, we don't need to acquire the spinlock here, because there + * shouldn't be any other workers running at this point, but we do so for + * consistency. + */ + SpinLockAcquire(&btscan->btps_mutex); + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount = 0; + SpinLockRelease(&btscan->btps_mutex); +} + +/* + * _bt_parallel_seize() -- Begin the process of advancing the scan to a new + * page. Other scans must wait until we call bt_parallel_release() or + * bt_parallel_done(). + * + * The return value tells caller whether to continue the scan or not. The + * true value indicates either one of the following (a) the block number + * returned is valid and the scan can be continued (b) the block number is + * invalid and the scan has just begun (c) the block number is P_NONE and the + * scan is finished. The false value indicates that we have reached the end + * of scan for current scankeys and for that we return block number as P_NONE. + * + * The first time master backend or worker hits last page, it will return + * P_NONE and status as 'True', after that any worker tries to fetch next + * page, it will return status as 'False'. + * + * Callers ignore the value of pageno, if false is returned. + */ +bool +_bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTPS_State pageStatus; + bool exit_loop = false; + bool status = true; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + while (1) + { + /* + * Fetch the next block to scan and update the page status so that + * other participants of parallel scan can wait till next page is + * available for scan. Set the status as false, if scan is finished. + */ + SpinLockAcquire(&btscan->btps_mutex); + pageStatus = btscan->btps_pageStatus; + + /* Check if the scan for current scan keys is finished */ + if (so->arrayKeyCount < btscan->btps_arrayKeyCount) + status = false; + else if (pageStatus == BTPARALLEL_DONE) + status = false; + else if (pageStatus != BTPARALLEL_ADVANCING) + { + btscan->btps_pageStatus = BTPARALLEL_ADVANCING; + *pageno = btscan->btps_scanPage; + exit_loop = true; + } + SpinLockRelease(&btscan->btps_mutex); + if (exit_loop || !status) + break; + ConditionVariableSleep(&btscan->btps_cv, WAIT_EVENT_BTREE_PAGE); + } + ConditionVariableCancelSleep(); + + /* no more pages to scan */ + if (!status) + *pageno = P_NONE; + + return status; +} + +/* + * _bt_parallel_release() -- Complete the process of advancing the scan to a + * new page. We now have the new value btps_scanPage; some other backend + * can now begin advancing the scan. + * + * scan_page - For backward scans, it holds the latest page being scanned + * and for forward scans, it holds the next page to be scanned. + */ +void +_bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page) +{ + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + SpinLockAcquire(&btscan->btps_mutex); + btscan->btps_scanPage = scan_page; + btscan->btps_pageStatus = BTPARALLEL_IDLE; + SpinLockRelease(&btscan->btps_mutex); + ConditionVariableSignal(&btscan->btps_cv); +} + +/* + * _bt_parallel_done() -- Mark the parallel scan as complete. + * + * When there are no pages left to scan, this function should be called to + * notify other workers. Otherwise, they might wait forever for the scan to + * advance to the next page. + */ +void +_bt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + bool status_changed = false; + + /* Do nothing, for non-parallel scans */ + if (parallel_scan == NULL) + return; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + /* + * Ensure to mark parallel scan as done no more than once for single scan. + * We rely on this state to initiate the next scan for multiple array + * keys, see _bt_advance_array_keys. + */ + SpinLockAcquire(&btscan->btps_mutex); + if (so->arrayKeyCount >= btscan->btps_arrayKeyCount && + btscan->btps_pageStatus != BTPARALLEL_DONE) + { + btscan->btps_pageStatus = BTPARALLEL_DONE; + status_changed = true; + } + SpinLockRelease(&btscan->btps_mutex); + + /* wake up all the workers associated with this parallel scan */ + if (status_changed) + ConditionVariableBroadcast(&btscan->btps_cv); +} + +/* + * _bt_parallel_advance_array_keys() -- Advances the parallel scan for array + * keys. + * + * Updates the count of array keys processed for both local and parallel + * scans. + */ +void +_bt_parallel_advance_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + ParallelIndexScanDesc parallel_scan = scan->parallel_scan; + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) parallel_scan, + parallel_scan->ps_offset); + + so->arrayKeyCount++; + SpinLockAcquire(&btscan->btps_mutex); + if (btscan->btps_pageStatus == BTPARALLEL_DONE) + { + btscan->btps_scanPage = InvalidBlockNumber; + btscan->btps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->btps_arrayKeyCount++; + } + SpinLockRelease(&btscan->btps_mutex); +} + +/* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells * whether any given heap tuple (identified by ItemPointer) is being deleted. diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index b6459d2..a211c35 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -30,9 +30,12 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, IndexTuple itup); static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp); +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); /* @@ -544,8 +547,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ScanKeyData notnullkeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; + bool status = true; StrategyNumber strat_total; BTScanPosItem *currItem; + BlockNumber blkno; Assert(!BTScanPosIsValid(so->currPos)); @@ -564,6 +569,38 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) if (!so->qual_ok) return false; + /* + * For parallel scans, get the page from shared state. If scan has not + * started, proceed to find out first leaf page to scan by keeping other + * workers waiting until we have descended to appropriate leaf page to be + * scanned for matching tuples. + * + * If the scan has already begun, skip finding the first leaf page and + * directly scanning the page stored in shared structure or the page to + * its left in case of backward scan. + */ + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + BTScanPosInvalidate(so->currPos); + return false; + } + else if (blkno == P_NONE) + { + _bt_parallel_done(scan); + BTScanPosInvalidate(so->currPos); + return false; + } + else if (blkno != InvalidBlockNumber) + { + if (!_bt_parallel_readpage(scan, blkno, dir)) + return false; + goto readcomplete; + } + } + /*---------- * Examine the scan keys to discover where we need to start the scan. * @@ -743,7 +780,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * there. */ if (keysCount == 0) - return _bt_endpoint(scan, dir); + { + bool match; + + match = _bt_endpoint(scan, dir); + if (!match) + { + /* No match , indicate (parallel) scan finished */ + _bt_parallel_done(scan); + BTScanPosInvalidate(so->currPos); + } + + return match; + } /* * We want to start the scan somewhere within the index. Set up an @@ -993,25 +1042,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * because nothing finer to lock exists. */ PredicateLockRelation(rel, scan->xs_snapshot); + + /* + * mark parallel scan as done, so that all the workers can finish + * their scan + */ + _bt_parallel_done(scan); + BTScanPosInvalidate(so->currPos); + return false; } else PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot); - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) - { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } - else - { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - Assert(so->markItemIndex == -1); + _bt_initialize_more_data(so, dir); /* position to the precise item on the page */ offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey); @@ -1060,6 +1105,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) _bt_drop_lock_and_maybe_pin(scan, &so->currPos); } +readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; scan->xs_ctup.t_self = currItem->heapTid; @@ -1132,6 +1178,10 @@ _bt_next(IndexScanDesc scan, ScanDirection dir) * moreLeft or moreRight (as appropriate) is cleared if _bt_checkkeys reports * that there can be no more matching tuples in the current scan direction. * + * In the case of parallel scans, caller must have called _bt_parallel_seize + * and _bt_parallel_release will be called in this function to advance the + * parallel scan. + * * Returns true if any matching items found on the page, false if none. */ static bool @@ -1154,6 +1204,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum) page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* allow next page be processed by parallel worker */ + if (scan->parallel_scan) + { + if (ScanDirectionIsForward(dir)) + _bt_parallel_release(scan, opaque->btpo_next); + else + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); @@ -1278,21 +1338,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex, * if pinned, we'll drop the pin before moving to next page. The buffer is * not locked on entry. * - * On success exit, so->currPos is updated to contain data from the next - * interesting page. For success on a scan using a non-MVCC snapshot we hold - * a pin, but not a read lock, on that page. If we do not hold the pin, we - * set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success. - * - * If there are no more matching records in the given direction, we drop all - * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. + * For success on a scan using a non-MVCC snapshot we hold a pin, but not a + * read lock, on that page. If we do not hold the pin, we set so->currPos.buf + * to InvalidBuffer. We return TRUE to indicate success. */ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque) scan->opaque; - Relation rel; - Page page; - BTPageOpaque opaque; + BlockNumber blkno = InvalidBlockNumber; + bool status = true; Assert(BTScanPosIsValid(so->currPos)); @@ -1319,13 +1374,27 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) so->markItemIndex = -1; } - rel = scan->indexRelation; - if (ScanDirectionIsForward(dir)) { /* Walk right to the next page with data */ - /* We must rely on the previously saved nextPage link! */ - BlockNumber blkno = so->currPos.nextPage; + + /* + * We must rely on the previously saved nextPage link for non-parallel + * scans! + */ + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + /* release the previous buffer, if pinned */ + BTScanPosUnpinIfPinned(so->currPos); + BTScanPosInvalidate(so->currPos); + return false; + } + } + else + blkno = so->currPos.nextPage; /* Remember we left a page with data */ so->currPos.moreLeft = true; @@ -1333,11 +1402,72 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* release the previous buffer, if pinned */ BTScanPosUnpinIfPinned(so->currPos); + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + } + else + { + /* Remember we left a page with data */ + so->currPos.moreRight = true; + + /* For parallel scans, get the last page scanned */ + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + BTScanPosUnpinIfPinned(so->currPos); + if (!status) + { + BTScanPosInvalidate(so->currPos); + return false; + } + } + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + } + + /* Drop the lock, and maybe the pin, on the current page */ + _bt_drop_lock_and_maybe_pin(scan, &so->currPos); + + return true; +} + +/* + * _bt_readnextpage() -- Read next page containing valid data for scan + * + * On success exit, so->currPos is updated to contain data from the next + * interesting page. Caller is responsible to release lock and pin on + * buffer on success. We return TRUE to indicate success. + * + * If there are no more matching records in the given direction, we drop all + * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. + * + * Callers pass valid blkno for forward and backward scans with an exception + * for backward scans in which case so->currPos is expected to contain a valid + * value. + */ +static bool +_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Relation rel; + Page page; + BTPageOpaque opaque; + bool status = true; + + rel = scan->indexRelation; + + if (ScanDirectionIsForward(dir)) + { for (;;) { - /* if we're at end of scan, give up */ + /* + * if we're at end of scan, give up and mark parallel scan as + * done, so that all the workers can finish their scan + */ if (blkno == P_NONE || !so->currPos.moreRight) { + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1359,14 +1489,30 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) } /* nope, keep going */ - blkno = opaque->btpo_next; + if (scan->parallel_scan != NULL) + { + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + _bt_relbuf(rel, so->currPos.buf); + BTScanPosInvalidate(so->currPos); + return false; + } + } + else + blkno = opaque->btpo_next; _bt_relbuf(rel, so->currPos.buf); } } else { - /* Remember we left a page with data */ - so->currPos.moreRight = true; + /* + * for parallel scans, current block number needs to be retrieved from + * shared state and it is the responsibility of caller to pass the + * correct block number. + */ + if (blkno != InvalidBlockNumber) + so->currPos.currPage = blkno; /* * Walk left to the next page with data. This is much more complex @@ -1401,6 +1547,12 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (!so->currPos.moreLeft) { _bt_relbuf(rel, so->currPos.buf); + + /* + * mark parallel scan as done, so that all the workers can + * finish their scan + */ + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1412,6 +1564,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* if we're physically at end of index, return failure */ if (so->currPos.buf == InvalidBuffer) { + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1432,9 +1585,46 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) break; } + + /* + * For parallel scans, get the last page scanned as it is quite + * possible that by the time we try to fetch previous page, other + * worker has also decided to scan that previous page. So we + * can't rely on _bt_walk_left call. + */ + if (scan->parallel_scan != NULL) + { + _bt_relbuf(rel, so->currPos.buf); + status = _bt_parallel_seize(scan, &blkno); + if (!status) + { + BTScanPosInvalidate(so->currPos); + return false; + } + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); + } } } + return true; +} + +/* + * _bt_parallel_readpage() -- Read current page containing valid data for scan + * + * On success, release lock and pin on buffer. We return TRUE to indicate + * success. + */ +static bool +_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + + _bt_initialize_more_data(so, dir); + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + /* Drop the lock, and maybe the pin, on the current page */ _bt_drop_lock_and_maybe_pin(scan, &so->currPos); @@ -1712,19 +1902,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) /* remember which buffer we have pinned */ so->currPos.buf = buf; - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) - { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } - else - { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); /* * Now load data from the first page of the scan. @@ -1753,3 +1931,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) return true; } + +/* + * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately + * for scan direction + */ +static inline void +_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) +{ + /* initialize moreLeft/moreRight appropriately for scan direction */ + if (ScanDirectionIsForward(dir)) + { + so->currPos.moreLeft = false; + so->currPos.moreRight = true; + } + else + { + so->currPos.moreLeft = true; + so->currPos.moreRight = false; + } + so->numKilled = 0; /* just paranoia */ + so->markItemIndex = -1; /* ditto */ +} diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index da0f330..5b259a3 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) break; } + /* advance parallel scan */ + if (scan->parallel_scan != NULL) + _bt_parallel_advance_array_keys(scan); + return found; } diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 7176cf1..92af6ec 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3392,6 +3392,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_PARALLEL_FINISH: event_name = "ParallelFinish"; break; + case WAIT_EVENT_BTREE_PAGE: + event_name = "ParallelBtreePage"; + break; case WAIT_EVENT_SAFE_SNAPSHOT: event_name = "SafeSnapshot"; break; diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index 344ef99..4c01a2f 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -609,6 +609,8 @@ typedef struct BTScanOpaqueData ScanKey arrayKeyData; /* modified copy of scan->keyData */ int numArrayKeys; /* number of equality-type array keys (-1 if * there are any unsatisfiable array keys) */ + int arrayKeyCount; /* count indicating number of array scan keys + * processed */ BTArrayKeyInfo *arrayKeys; /* info about each equality-type array key */ MemoryContext arrayContext; /* scan-lifespan context for array data */ @@ -652,7 +654,8 @@ typedef BTScanOpaqueData *BTScanOpaque; #define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT) /* - * prototypes for functions in nbtree.c (external entry points for btree) + * prototypes for functions in nbtree.c (external entry points for btree and + * functions to maintain state of parallel scan) */ extern IndexBuildResult *btbuild(Relation heap, Relation index, struct IndexInfo *indexInfo); @@ -662,10 +665,17 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull, IndexUniqueCheck checkUnique, struct IndexInfo *indexInfo); extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys); +extern Size btestimateparallelscan(void); +extern void btinitparallelscan(void *target); +extern bool _bt_parallel_seize(IndexScanDesc scan, BlockNumber *pageno); +extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber scan_page); +extern void _bt_parallel_done(IndexScanDesc scan); +extern void _bt_parallel_advance_array_keys(IndexScanDesc scan); extern bool btgettuple(IndexScanDesc scan, ScanDirection dir); extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm); extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, ScanKey orderbys, int norderbys); +extern void btparallelrescan(IndexScanDesc scan); extern void btendscan(IndexScanDesc scan); extern void btmarkpos(IndexScanDesc scan); extern void btrestrpos(IndexScanDesc scan); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index de8225b..915cc85 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -786,6 +786,7 @@ typedef enum WAIT_EVENT_MQ_RECEIVE, WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, + WAIT_EVENT_BTREE_PAGE, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP } WaitEventIPC; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c4235ae..9f876ae 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -161,6 +161,9 @@ BTPageOpaque BTPageOpaqueData BTPageStat BTPageState +BTParallelScanDesc +BTParallelScanDescData +BTPS_State BTScanOpaque BTScanOpaqueData BTScanPos