diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml index 40f201b..8d4f9f7 100644 --- a/doc/src/sgml/indexam.sgml +++ b/doc/src/sgml/indexam.sgml @@ -77,61 +77,64 @@ The structure IndexAmRoutine is defined thus: -typedef struct IndexAmRoutine -{ - NodeTag type; - - /* - * Total number of strategies (operators) by which we can traverse/search - * this AM. Zero if AM does not have a fixed set of strategy assignments. - */ - uint16 amstrategies; - /* total number of support functions that this AM uses */ - uint16 amsupport; - /* does AM support ORDER BY indexed column's value? */ - bool amcanorder; - /* does AM support ORDER BY result of an operator on indexed column? */ - bool amcanorderbyop; - /* does AM support backward scanning? */ - bool amcanbackward; - /* does AM support UNIQUE indexes? */ - bool amcanunique; - /* does AM support multi-column indexes? */ - bool amcanmulticol; - /* does AM require scans to have a constraint on the first index column? */ - bool amoptionalkey; - /* does AM handle ScalarArrayOpExpr quals? */ - bool amsearcharray; - /* does AM handle IS NULL/IS NOT NULL quals? */ - bool amsearchnulls; - /* can index storage data type differ from column data type? */ - bool amstorage; - /* can an index of this type be clustered on? */ - bool amclusterable; - /* does AM handle predicate locks? */ - bool ampredlocks; - /* type of data stored in index, or InvalidOid if variable */ - Oid amkeytype; - - /* interface functions */ - ambuild_function ambuild; - ambuildempty_function ambuildempty; - aminsert_function aminsert; - ambulkdelete_function ambulkdelete; - amvacuumcleanup_function amvacuumcleanup; - amcanreturn_function amcanreturn; /* can be NULL */ - amcostestimate_function amcostestimate; - amoptions_function amoptions; - amproperty_function amproperty; /* can be NULL */ - amvalidate_function amvalidate; - ambeginscan_function ambeginscan; - amrescan_function amrescan; - amgettuple_function amgettuple; /* can be NULL */ - amgetbitmap_function amgetbitmap; /* can be NULL */ - amendscan_function amendscan; - ammarkpos_function ammarkpos; /* can be NULL */ - amrestrpos_function amrestrpos; /* can be NULL */ -} IndexAmRoutine; + typedef struct IndexAmRoutine + { + NodeTag type; + + /* + * Total number of strategies (operators) by which we can traverse/search + * this AM. Zero if AM does not have a fixed set of strategy assignments. + */ + uint16 amstrategies; + /* total number of support functions that this AM uses */ + uint16 amsupport; + /* does AM support ORDER BY indexed column's value? */ + bool amcanorder; + /* does AM support ORDER BY result of an operator on indexed column? */ + bool amcanorderbyop; + /* does AM support backward scanning? */ + bool amcanbackward; + /* does AM support UNIQUE indexes? */ + bool amcanunique; + /* does AM support multi-column indexes? */ + bool amcanmulticol; + /* does AM require scans to have a constraint on the first index column? */ + bool amoptionalkey; + /* does AM handle ScalarArrayOpExpr quals? */ + bool amsearcharray; + /* does AM handle IS NULL/IS NOT NULL quals? */ + bool amsearchnulls; + /* can index storage data type differ from column data type? */ + bool amstorage; + /* can an index of this type be clustered on? */ + bool amclusterable; + /* does AM handle predicate locks? */ + bool ampredlocks; + /* type of data stored in index, or InvalidOid if variable */ + Oid amkeytype; + + /* interface functions */ + ambuild_function ambuild; + ambuildempty_function ambuildempty; + aminsert_function aminsert; + ambulkdelete_function ambulkdelete; + amvacuumcleanup_function amvacuumcleanup; + amcanreturn_function amcanreturn; /* can be NULL */ + amcostestimate_function amcostestimate; + amoptions_function amoptions; + amproperty_function amproperty; /* can be NULL */ + amvalidate_function amvalidate; + amestimateparallelscan_function amestimateparallelscan; /* can be NULL */ + aminitparallelscan_function aminitparallelscan; /* can be NULL */ + ambeginscan_function ambeginscan; + amrescan_function amrescan; + amparallelrescan_function amparallelrescan; /* can be NULL */ + amgettuple_function amgettuple; /* can be NULL */ + amgetbitmap_function amgetbitmap; /* can be NULL */ + amendscan_function amendscan; + ammarkpos_function ammarkpos; /* can be NULL */ + amrestrpos_function amrestrpos; /* can be NULL */ + } IndexAmRoutine; @@ -459,6 +462,36 @@ amvalidate (Oid opclassoid); invalid. Problems should be reported with ereport messages. + + +Size +amestimateparallelscan (Size index_size); + + Estimate and return the storage space required for parallel index scan. + The index_size parameter indicate the size of generic parallel + index scan information. The size of index-type-specific parallel information + will be added to index_size and result will be returned back to + caller. + + + + +void +aminitparallelscan (void *target); + + Initialize the parallel index scan state. It will be used to initialize + index-type-specific parallel information which will be stored immediatedly + after generic parallel information required for parallel index scans. The + required state information will be set in target. + + + + The aminitparallelscan and amestimateparallelscan + functions need only be provided if the access method supports parallel + index scans. If it doesn't, the aminitparallelscan and + amestimateparallelscan fields in its IndexAmRoutine + struct must be set to NULL. + The purpose of an index, of course, is to support scans for tuples matching @@ -511,6 +544,23 @@ amrescan (IndexScanDesc scan, +void +amparallelrescan (IndexScanDesc scan); + + Restart the parallel index scan. It resets the parallel index scan state. + It must be called only during restart of scan which will be typically + required for the inner side of nest-loop join. + + + + The amparallelrescan function need only be provided if the + access method supports parallel index scans. If it doesn't, + the amparallelrescan field in its IndexAmRoutine + struct must be set to NULL. + + + + boolean amgettuple (IndexScanDesc scan, ScanDirection direction); diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 1545f03..42ef91f 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1199,7 +1199,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting in an extension. - IPC + IPC BgWorkerShutdown Waiting for background worker to shut down. @@ -1232,6 +1232,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for parallel workers to finish computing. + ParallelIndexScan + Waiting for next block to be available for parallel index scan. + + SafeSnapshot Waiting for a snapshot for a READ ONLY DEFERRABLE transaction. diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c index 1b45a4c..5ff6a0f 100644 --- a/src/backend/access/brin/brin.c +++ b/src/backend/access/brin/brin.c @@ -104,8 +104,11 @@ brinhandler(PG_FUNCTION_ARGS) amroutine->amoptions = brinoptions; amroutine->amproperty = NULL; amroutine->amvalidate = brinvalidate; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; amroutine->ambeginscan = brinbeginscan; amroutine->amrescan = brinrescan; + amroutine->amparallelrescan = NULL; amroutine->amgettuple = NULL; amroutine->amgetbitmap = bringetbitmap; amroutine->amendscan = brinendscan; diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c index f07eedc..6f7024e 100644 --- a/src/backend/access/gin/ginutil.c +++ b/src/backend/access/gin/ginutil.c @@ -61,8 +61,11 @@ ginhandler(PG_FUNCTION_ARGS) amroutine->amoptions = ginoptions; amroutine->amproperty = NULL; amroutine->amvalidate = ginvalidate; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; amroutine->ambeginscan = ginbeginscan; amroutine->amrescan = ginrescan; + amroutine->amparallelrescan = NULL; amroutine->amgettuple = NULL; amroutine->amgetbitmap = gingetbitmap; amroutine->amendscan = ginendscan; diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c index b8aa9bc..5727ef9 100644 --- a/src/backend/access/gist/gist.c +++ b/src/backend/access/gist/gist.c @@ -81,8 +81,11 @@ gisthandler(PG_FUNCTION_ARGS) amroutine->amoptions = gistoptions; amroutine->amproperty = gistproperty; amroutine->amvalidate = gistvalidate; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; amroutine->ambeginscan = gistbeginscan; amroutine->amrescan = gistrescan; + amroutine->amparallelrescan = NULL; amroutine->amgettuple = gistgettuple; amroutine->amgetbitmap = gistgetbitmap; amroutine->amendscan = gistendscan; diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index 1fa087a..9e5740b 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -78,8 +78,11 @@ hashhandler(PG_FUNCTION_ARGS) amroutine->amoptions = hashoptions; amroutine->amproperty = NULL; amroutine->amvalidate = hashvalidate; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; amroutine->ambeginscan = hashbeginscan; amroutine->amrescan = hashrescan; + amroutine->amparallelrescan = NULL; amroutine->amgettuple = hashgettuple; amroutine->amgetbitmap = hashgetbitmap; amroutine->amendscan = hashendscan; diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c index 65c941d..84ebd72 100644 --- a/src/backend/access/index/genam.c +++ b/src/backend/access/index/genam.c @@ -375,7 +375,7 @@ systable_beginscan(Relation heapRelation, } sysscan->iscan = index_beginscan(heapRelation, irel, - snapshot, nkeys, 0); + snapshot, NULL, nkeys, 0, false); index_rescan(sysscan->iscan, key, nkeys, NULL, 0); sysscan->scan = NULL; } @@ -577,7 +577,7 @@ systable_beginscan_ordered(Relation heapRelation, } sysscan->iscan = index_beginscan(heapRelation, indexRelation, - snapshot, nkeys, 0); + snapshot, NULL, nkeys, 0, false); index_rescan(sysscan->iscan, key, nkeys, NULL, 0); sysscan->scan = NULL; diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c index 54b71cb..72e1b03 100644 --- a/src/backend/access/index/indexam.c +++ b/src/backend/access/index/indexam.c @@ -207,6 +207,70 @@ index_insert(Relation indexRelation, } /* + * index_parallelscan_estimate - estimate storage for ParallelIndexScanDesc + * + * It calls am specific routine to obtain size of am specific shared + * information. + */ +Size +index_parallelscan_estimate(Relation indexrel, Snapshot snapshot) +{ + Size index_size = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data), + EstimateSnapshotSpace(snapshot)); + + /* amestimateparallelscan is optional; assume no-op if not provided by AM */ + if (indexrel->rd_amroutine->amestimateparallelscan == NULL) + return index_size; + else + return indexrel->rd_amroutine->amestimateparallelscan(index_size); +} + +/* + * index_parallelscan_initialize - initialize ParallelIndexScanDesc + * + * It calls access method specific initialization routine to initialize am + * specific information. Call this just once in the leader process; then, + * individual workers attach via index_beginscan_parallel. + */ +void +index_parallelscan_initialize(Relation heaprel, Relation indexrel, Snapshot snapshot, ParallelIndexScanDesc target) +{ + Size offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data), + EstimateSnapshotSpace(snapshot)); + void *amtarget = (char *) ((void *) target) + offset; + + target->ps_relid = RelationGetRelid(heaprel); + target->ps_indexid = RelationGetRelid(indexrel); + target->ps_offset = offset; + SerializeSnapshot(snapshot, target->ps_snapshot_data); + + /* aminitparallelscan is optional; assume no-op if not provided by AM */ + if (indexrel->rd_amroutine->aminitparallelscan == NULL) + return; + + indexrel->rd_amroutine->aminitparallelscan(amtarget); +} + +/* + * index_beginscan_parallel - join parallel index scan + * + * Caller must be holding suitable locks on the heap and the index. + */ +IndexScanDesc +index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys, + int norderbys, ParallelIndexScanDesc pscan) +{ + Snapshot snapshot; + IndexScanDesc scan; + + Assert(RelationGetRelid(heaprel) == pscan->ps_relid); + snapshot = RestoreSnapshot(pscan->ps_snapshot_data); + RegisterSnapshot(snapshot); + scan = index_beginscan(heaprel, indexrel, snapshot, pscan, nkeys, norderbys, true); + return scan; +} + +/* * index_beginscan - start a scan of an index with amgettuple * * Caller must be holding suitable locks on the heap and the index. @@ -214,8 +278,8 @@ index_insert(Relation indexRelation, IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, - Snapshot snapshot, - int nkeys, int norderbys) + Snapshot snapshot, ParallelIndexScanDesc pscan, + int nkeys, int norderbys, bool temp_snap) { IndexScanDesc scan; @@ -227,6 +291,8 @@ index_beginscan(Relation heapRelation, */ scan->heapRelation = heapRelation; scan->xs_snapshot = snapshot; + scan->parallel_scan = pscan; + scan->xs_temp_snap = temp_snap; return scan; } @@ -251,6 +317,8 @@ index_beginscan_bitmap(Relation indexRelation, * up by RelationGetIndexScan. */ scan->xs_snapshot = snapshot; + scan->parallel_scan = NULL; + scan->xs_temp_snap = false; return scan; } @@ -319,6 +387,22 @@ index_rescan(IndexScanDesc scan, } /* ---------------- + * index_parallelrescan - (re)start a parallel scan of an index + * ---------------- + */ +void +index_parallelrescan(IndexScanDesc scan) +{ + SCAN_CHECKS; + + /* amparallelrescan is optional; assume no-op if not provided by AM */ + if (scan->indexRelation->rd_amroutine->amparallelrescan == NULL) + return; + + scan->indexRelation->rd_amroutine->amparallelrescan(scan); +} + +/* ---------------- * index_endscan - end a scan * ---------------- */ @@ -341,6 +425,9 @@ index_endscan(IndexScanDesc scan) /* Release index refcount acquired by index_beginscan */ RelationDecrementReferenceCount(scan->indexRelation); + if (scan->xs_temp_snap) + UnregisterSnapshot(scan->xs_snapshot); + /* Release the scan data structure itself */ IndexScanEnd(scan); } diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index a264b92..e749a63 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -23,6 +23,8 @@ #include "access/xlog.h" #include "catalog/index.h" #include "commands/vacuum.h" +#include "pgstat.h" +#include "storage/condition_variable.h" #include "storage/indexfsm.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -62,6 +64,25 @@ typedef struct MemoryContext pagedelcontext; } BTVacState; +/* + * BTParallelScanDescData contains btree specific shared information required + * for parallel scan. + */ +typedef struct BTParallelScanDescData +{ + BlockNumber ps_nextPage; /* latest or next page to be scanned */ + uint8 ps_pageStatus; /* indicates whether next page is available + * for scan. see nbtree.h for possible states + * of parallel scan. */ + int ps_arrayKeyCount; /* count indicating number of array + * scan keys processed by parallel + * scan */ + slock_t ps_mutex; /* protects above variables */ + ConditionVariable cv; /* used to synchronize parallel scan */ +} BTParallelScanDescData; + +typedef struct BTParallelScanDescData *BTParallelScanDesc; + static void btbuildCallback(Relation index, HeapTuple htup, @@ -110,8 +131,11 @@ bthandler(PG_FUNCTION_ARGS) amroutine->amoptions = btoptions; amroutine->amproperty = btproperty; amroutine->amvalidate = btvalidate; + amroutine->amestimateparallelscan = btestimateparallelscan; + amroutine->aminitparallelscan = btinitparallelscan; amroutine->ambeginscan = btbeginscan; amroutine->amrescan = btrescan; + amroutine->amparallelrescan = btparallelrescan; amroutine->amgettuple = btgettuple; amroutine->amgetbitmap = btgetbitmap; amroutine->amendscan = btendscan; @@ -467,6 +491,175 @@ btbeginscan(Relation rel, int nkeys, int norderbys) } /* + * btestimateparallelscan - estimate storage for BTParallelScanDescData + */ +Size +btestimateparallelscan(Size index_size) +{ + return add_size(index_size, sizeof(BTParallelScanDescData)); +} + +/* + * btinitparallelscan - Initializing BTParallelScanDesc for parallel btree scan + */ +void +btinitparallelscan(void *target) +{ + BTParallelScanDesc bt_target = (BTParallelScanDesc) target; + + SpinLockInit(&bt_target->ps_mutex); + bt_target->ps_nextPage = InvalidBlockNumber; + bt_target->ps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + bt_target->ps_arrayKeyCount = 0; + ConditionVariableInit(&bt_target->cv); +} + +/* + * _bt_parallel_seize() -- returns the next block to be scanned for forward + * scans and latest block scanned for backward scans. + * + * status - True indicates that the block number returned is valid and scan + * is continued or block number is invalid and scan has just begun + * or block number is P_NONE and scan is finished. False indicates + * that we have reached the end of scan for current scankeys and for + * that we return block number as P_NONE. + * + * The first time master backend or worker hits last page, it will return * P_NONE and status as 'True', after that any worker tries to fetch next + * page, it will return status as 'False'. + * + * Callers ignore the return value, if the status is false. + */ +BlockNumber +_bt_parallel_seize(IndexScanDesc scan, bool *status) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + uint8 pageStatus; + bool exit_loop = false; + BlockNumber nextPage = InvalidBlockNumber; + BTParallelScanDesc btscan = (BTParallelScanDesc) OffsetToPointer( + (void *) scan->parallel_scan, + scan->parallel_scan->ps_offset); + + *status = true; + while (1) + { + SpinLockAcquire(&btscan->ps_mutex); + pageStatus = btscan->ps_pageStatus; + if (so->arrayKeyCount < btscan->ps_arrayKeyCount) + *status = false; + else if (pageStatus == BTPARALLEL_DONE) + *status = false; + else if (pageStatus != BTPARALLEL_ADVANCING) + { + btscan->ps_pageStatus = BTPARALLEL_ADVANCING; + nextPage = btscan->ps_nextPage; + exit_loop = true; + } + SpinLockRelease(&btscan->ps_mutex); + if (exit_loop || *status == false) + break; + ConditionVariableSleep(&btscan->cv, WAIT_EVENT_PARALLEL_INDEX_SCAN); + } + ConditionVariableCancelSleep(); + + /* no more pages to scan */ + if (*status == false) + return P_NONE; + + *status = true; + return nextPage; +} + +/* + * _bt_parallel_release() -- Advances the parallel scan to allow scan of next + * page + * + * It updates the value of next page that allows parallel scan to move forward + * or backward depending on scan direction. It wakes up one of the sleeping + * workers. + * + * For backward scan, next_page holds the latest page being scanned. + * For forward scan, next_page holds the next page to be scanned. + */ +void +_bt_parallel_release(IndexScanDesc scan, BlockNumber next_page) +{ + BTParallelScanDesc btscan = (BTParallelScanDesc) OffsetToPointer( + (void *) scan->parallel_scan, + scan->parallel_scan->ps_offset); + + SpinLockAcquire(&btscan->ps_mutex); + btscan->ps_nextPage = next_page; + btscan->ps_pageStatus = BTPARALLEL_IDLE; + SpinLockRelease(&btscan->ps_mutex); + ConditionVariableSignal(&btscan->cv); +} + +/* + * _bt_parallel_done() -- Finishes the parallel scan + * + * This must be called when there are no pages left to scan. Notify end of + * parallel scan to all the other workers associated with this scan. + */ +void +_bt_parallel_done(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTParallelScanDesc btscan; + bool status_changed = false; + + /* Do nothing, for non-parallel scans */ + if (scan->parallel_scan == NULL) + return; + + btscan = (BTParallelScanDesc) OffsetToPointer((void *) scan->parallel_scan, + scan->parallel_scan->ps_offset); + + /* + * Ensure to mark parallel scan as done no more than once for single scan. + * We rely on this state to initiate the next scan for multiple array + * keys, see _bt_advance_array_keys. + */ + SpinLockAcquire(&btscan->ps_mutex); + if (so->arrayKeyCount >= btscan->ps_arrayKeyCount && + btscan->ps_pageStatus != BTPARALLEL_DONE) + { + btscan->ps_pageStatus = BTPARALLEL_DONE; + status_changed = true; + } + SpinLockRelease(&btscan->ps_mutex); + + /* wake up all the workers associated with this parallel scan */ + if (status_changed) + ConditionVariableBroadcast(&btscan->cv); +} + +/* + * _bt_parallel_advance_scan() -- Advances the parallel scan + * + * It updates the count of array keys processed for both local and parallel + * scans. + */ +void +_bt_parallel_advance_scan(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + BTParallelScanDesc btscan = (BTParallelScanDesc) OffsetToPointer( + (void *) scan->parallel_scan, + scan->parallel_scan->ps_offset); + + so->arrayKeyCount++; + SpinLockAcquire(&btscan->ps_mutex); + if (btscan->ps_pageStatus == BTPARALLEL_DONE) + { + btscan->ps_nextPage = InvalidBlockNumber; + btscan->ps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + btscan->ps_arrayKeyCount++; + } + SpinLockRelease(&btscan->ps_mutex); +} + +/* * btrescan() -- rescan an index relation */ void @@ -486,6 +679,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, } so->markItemIndex = -1; + so->arrayKeyCount = 0; BTScanPosUnpinIfPinned(so->markPos); BTScanPosInvalidate(so->markPos); @@ -526,6 +720,31 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, } /* + * btparallelrescan() -- reset parallel scan + */ +void +btparallelrescan(IndexScanDesc scan) +{ + if (scan->parallel_scan) + { + BTParallelScanDesc btscan; + + btscan = (BTParallelScanDesc) OffsetToPointer( + (void *) scan->parallel_scan, + scan->parallel_scan->ps_offset); + + /* + * Ideally, we don't need to acquire spinlock here, but being + * consistent with heap_rescan seems to be a good idea. + */ + SpinLockAcquire(&btscan->ps_mutex); + btscan->ps_nextPage = InvalidBlockNumber; + btscan->ps_pageStatus = BTPARALLEL_NOT_INITIALIZED; + SpinLockRelease(&btscan->ps_mutex); + } +} + +/* * btendscan() -- close down a scan */ void diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index ee46023..d9e0a5f 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -30,9 +30,12 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir, static void _bt_saveitem(BTScanOpaque so, int itemIndex, OffsetNumber offnum, IndexTuple itup); static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir); +static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); +static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir); static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot); static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir); static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp); +static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir); /* @@ -544,8 +547,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) ScanKeyData notnullkeys[INDEX_MAX_KEYS]; int keysCount = 0; int i; + bool status = true; StrategyNumber strat_total; BTScanPosItem *currItem; + BlockNumber blkno; Assert(!BTScanPosIsValid(so->currPos)); @@ -564,6 +569,38 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) if (!so->qual_ok) return false; + /* + * For parallel scans, get the page from shared state. If scan has not + * started, proceed to find out first leaf page to scan by keeping other + * workers waiting until we have descended to appropriate leaf page to be + * scanned for matching tuples. + * + * If the scan has already begun, skip finding the first leaf page and + * directly scanning the page stored in shared structure or the page to + * its left in case of backward scan. + */ + if (scan->parallel_scan != NULL) + { + blkno = _bt_parallel_seize(scan, &status); + if (status == false) + { + BTScanPosInvalidate(so->currPos); + return false; + } + else if (blkno == P_NONE) + { + _bt_parallel_done(scan); + BTScanPosInvalidate(so->currPos); + return false; + } + else if (blkno != InvalidBlockNumber) + { + if (!_bt_parallel_readpage(scan, blkno, dir)) + return false; + goto readcomplete; + } + } + /*---------- * Examine the scan keys to discover where we need to start the scan. * @@ -743,7 +780,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * there. */ if (keysCount == 0) - return _bt_endpoint(scan, dir); + { + bool match; + + match = _bt_endpoint(scan, dir); + if (!match) + { + /* No match , indicate (parallel) scan finished */ + _bt_parallel_done(scan); + BTScanPosInvalidate(so->currPos); + } + + return match; + } /* * We want to start the scan somewhere within the index. Set up an @@ -993,25 +1042,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) * because nothing finer to lock exists. */ PredicateLockRelation(rel, scan->xs_snapshot); + + /* + * mark parallel scan as done, so that all the workers can finish + * their scan + */ + _bt_parallel_done(scan); + BTScanPosInvalidate(so->currPos); + return false; } else PredicateLockPage(rel, BufferGetBlockNumber(buf), scan->xs_snapshot); - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) - { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } - else - { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - Assert(so->markItemIndex == -1); + _bt_initialize_more_data(so, dir); /* position to the precise item on the page */ offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey); @@ -1060,6 +1105,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir) _bt_drop_lock_and_maybe_pin(scan, &so->currPos); } +readcomplete: /* OK, itemIndex says what to return */ currItem = &so->currPos.items[so->currPos.itemIndex]; scan->xs_ctup.t_self = currItem->heapTid; @@ -1154,6 +1200,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum) page = BufferGetPage(so->currPos.buf); opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* allow next page be processed by parallel worker */ + if (scan->parallel_scan) + { + if (ScanDirectionIsForward(dir)) + _bt_parallel_release(scan, opaque->btpo_next); + else + _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf)); + } + minoff = P_FIRSTDATAKEY(opaque); maxoff = PageGetMaxOffsetNumber(page); @@ -1278,21 +1334,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex, * if pinned, we'll drop the pin before moving to next page. The buffer is * not locked on entry. * - * On success exit, so->currPos is updated to contain data from the next - * interesting page. For success on a scan using a non-MVCC snapshot we hold - * a pin, but not a read lock, on that page. If we do not hold the pin, we - * set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success. - * - * If there are no more matching records in the given direction, we drop all - * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. + * For success on a scan using a non-MVCC snapshot we hold a pin, but not a + * read lock, on that page. If we do not hold the pin, we set so->currPos.buf + * to InvalidBuffer. We return TRUE to indicate success. */ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir) { BTScanOpaque so = (BTScanOpaque) scan->opaque; - Relation rel; - Page page; - BTPageOpaque opaque; + BlockNumber blkno = InvalidBlockNumber; + bool status = true; Assert(BTScanPosIsValid(so->currPos)); @@ -1319,13 +1370,27 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) so->markItemIndex = -1; } - rel = scan->indexRelation; - if (ScanDirectionIsForward(dir)) { /* Walk right to the next page with data */ - /* We must rely on the previously saved nextPage link! */ - BlockNumber blkno = so->currPos.nextPage; + + /* + * We must rely on the previously saved nextPage link for non-parallel + * scans! + */ + if (scan->parallel_scan != NULL) + { + blkno = _bt_parallel_seize(scan, &status); + if (status == false) + { + /* release the previous buffer, if pinned */ + BTScanPosUnpinIfPinned(so->currPos); + BTScanPosInvalidate(so->currPos); + return false; + } + } + else + blkno = so->currPos.nextPage; /* Remember we left a page with data */ so->currPos.moreLeft = true; @@ -1333,11 +1398,68 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* release the previous buffer, if pinned */ BTScanPosUnpinIfPinned(so->currPos); + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + } + else + { + /* Remember we left a page with data */ + so->currPos.moreRight = true; + + /* For parallel scans, get the last page scanned */ + if (scan->parallel_scan != NULL) + { + blkno = _bt_parallel_seize(scan, &status); + BTScanPosUnpinIfPinned(so->currPos); + if (status == false) + { + BTScanPosInvalidate(so->currPos); + return false; + } + } + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + } + + /* Drop the lock, and maybe the pin, on the current page */ + _bt_drop_lock_and_maybe_pin(scan, &so->currPos); + + return true; +} + +/* + * _bt_readnextpage() -- Read next page containing valid data for scan + * + * On success exit, so->currPos is updated to contain data from the next + * interesting page. Caller is responsible to release lock and pin on + * buffer on success. We return TRUE to indicate success. + * + * If there are no more matching records in the given direction, we drop all + * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE. + */ +static bool +_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + Relation rel; + Page page; + BTPageOpaque opaque; + bool status = true; + + rel = scan->indexRelation; + + if (ScanDirectionIsForward(dir)) + { for (;;) { - /* if we're at end of scan, give up */ + /* + * if we're at end of scan, give up and mark parallel scan as + * done, so that all the workers can finish their scan + */ if (blkno == P_NONE || !so->currPos.moreRight) { + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1345,10 +1467,10 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) CHECK_FOR_INTERRUPTS(); /* step right one page */ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); - /* check for deleted page */ page = BufferGetPage(so->currPos.buf); TestForOldSnapshot(scan->xs_snapshot, rel, page); opaque = (BTPageOpaque) PageGetSpecialPointer(page); + /* check for deleted page */ if (!P_IGNORE(opaque)) { PredicateLockPage(rel, blkno, scan->xs_snapshot); @@ -1359,14 +1481,30 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) } /* nope, keep going */ - blkno = opaque->btpo_next; + if (scan->parallel_scan != NULL) + { + blkno = _bt_parallel_seize(scan, &status); + if (status == false) + { + _bt_relbuf(rel, so->currPos.buf); + BTScanPosInvalidate(so->currPos); + return false; + } + } + else + blkno = opaque->btpo_next; _bt_relbuf(rel, so->currPos.buf); } } else { - /* Remember we left a page with data */ - so->currPos.moreRight = true; + /* + * for parallel scans, current block number needs to be retrieved from + * shared state and it is the responsibility of caller to pass the + * correct block number. + */ + if (blkno != InvalidBlockNumber) + so->currPos.currPage = blkno; /* * Walk left to the next page with data. This is much more complex @@ -1401,6 +1539,12 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (!so->currPos.moreLeft) { _bt_relbuf(rel, so->currPos.buf); + + /* + * mark parallel scan as done, so that all the workers can + * finish their scan + */ + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1412,6 +1556,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) /* if we're physically at end of index, return failure */ if (so->currPos.buf == InvalidBuffer) { + _bt_parallel_done(scan); BTScanPosInvalidate(so->currPos); return false; } @@ -1432,9 +1577,48 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir) if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page))) break; } + + /* + * For parallel scans, get the last page scanned as it is quite + * possible that by the time we try to fetch previous page, other + * worker has also decided to scan that previous page. We could + * avoid that by doing _bt_parallel_release once we have read the + * current page, but it is bad to make other workers wait till we + * read the page. + */ + if (scan->parallel_scan != NULL) + { + _bt_relbuf(rel, so->currPos.buf); + blkno = _bt_parallel_seize(scan, &status); + if (status == false) + { + BTScanPosInvalidate(so->currPos); + return false; + } + so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ); + } } } + return true; +} + +/* + * _bt_parallel_readpage() -- Read current page containing valid data for scan + * + * On success, release lock and pin on buffer. We return TRUE to indicate + * success. + */ +static bool +_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + + _bt_initialize_more_data(so, dir); + + if (!_bt_readnextpage(scan, blkno, dir)) + return false; + /* Drop the lock, and maybe the pin, on the current page */ _bt_drop_lock_and_maybe_pin(scan, &so->currPos); @@ -1712,19 +1896,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) /* remember which buffer we have pinned */ so->currPos.buf = buf; - /* initialize moreLeft/moreRight appropriately for scan direction */ - if (ScanDirectionIsForward(dir)) - { - so->currPos.moreLeft = false; - so->currPos.moreRight = true; - } - else - { - so->currPos.moreLeft = true; - so->currPos.moreRight = false; - } - so->numKilled = 0; /* just paranoia */ - so->markItemIndex = -1; /* ditto */ + _bt_initialize_more_data(so, dir); /* * Now load data from the first page of the scan. @@ -1753,3 +1925,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir) return true; } + +/* + * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately + * for scan direction + */ +static inline void +_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) +{ + /* initialize moreLeft/moreRight appropriately for scan direction */ + if (ScanDirectionIsForward(dir)) + { + so->currPos.moreLeft = false; + so->currPos.moreRight = true; + } + else + { + so->currPos.moreLeft = true; + so->currPos.moreRight = false; + } + so->numKilled = 0; /* just paranoia */ + so->markItemIndex = -1; /* ditto */ +} diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 063c988..15ea6df 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) break; } + /* advance parallel scan */ + if (scan->parallel_scan != NULL) + _bt_parallel_advance_scan(scan); + return found; } diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index d570ae5..4f39e93 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -60,8 +60,11 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amoptions = spgoptions; amroutine->amproperty = NULL; amroutine->amvalidate = spgvalidate; + amroutine->amestimateparallelscan = NULL; + amroutine->aminitparallelscan = NULL; amroutine->ambeginscan = spgbeginscan; amroutine->amrescan = spgrescan; + amroutine->amparallelrescan = NULL; amroutine->amgettuple = spggettuple; amroutine->amgetbitmap = spggetbitmap; amroutine->amendscan = spgendscan; diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 2131226..02ca783 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -903,7 +903,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose, if (OldIndex != NULL && !use_sort) { heapScan = NULL; - indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, 0, 0); + indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, 0, false); index_rescan(indexScan, NULL, 0, NULL, 0); } else diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c index 009c1b7..f8e662b 100644 --- a/src/backend/executor/execIndexing.c +++ b/src/backend/executor/execIndexing.c @@ -722,7 +722,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index, retry: conflict = false; found_self = false; - index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0); + index_scan = index_beginscan(heap, index, &DirtySnapshot, NULL, index_natts, 0, false); index_rescan(index_scan, scankeys, index_natts, NULL, 0); while ((tup = index_getnext(index_scan, diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c index a364098..43c610c 100644 --- a/src/backend/executor/nodeBitmapIndexscan.c +++ b/src/backend/executor/nodeBitmapIndexscan.c @@ -26,6 +26,7 @@ #include "executor/nodeIndexscan.h" #include "miscadmin.h" #include "utils/memutils.h" +#include "access/relscan.h" /* ---------------------------------------------------------------- @@ -48,6 +49,7 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node) * extract necessary information from index scan node */ scandesc = node->biss_ScanDesc; + scandesc->parallel_scan = NULL; /* * If we have runtime keys and they've not already been set up, do it now. diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c index 4f6f91c..45566bd 100644 --- a/src/backend/executor/nodeIndexonlyscan.c +++ b/src/backend/executor/nodeIndexonlyscan.c @@ -540,9 +540,9 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags) */ indexstate->ioss_ScanDesc = index_beginscan(currentRelation, indexstate->ioss_RelationDesc, - estate->es_snapshot, + estate->es_snapshot, NULL, indexstate->ioss_NumScanKeys, - indexstate->ioss_NumOrderByKeys); + indexstate->ioss_NumOrderByKeys, false); /* Set it up for index-only scan */ indexstate->ioss_ScanDesc->xs_want_itup = true; diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c index 3143bd9..77d2990 100644 --- a/src/backend/executor/nodeIndexscan.c +++ b/src/backend/executor/nodeIndexscan.c @@ -1022,9 +1022,9 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags) */ indexstate->iss_ScanDesc = index_beginscan(currentRelation, indexstate->iss_RelationDesc, - estate->es_snapshot, + estate->es_snapshot, NULL, indexstate->iss_NumScanKeys, - indexstate->iss_NumOrderByKeys); + indexstate->iss_NumOrderByKeys, false); /* * If no run-time keys to calculate, go ahead and pass the scankeys to the diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 61e6a2c..1bbeec6 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3386,6 +3386,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_PARALLEL_FINISH: event_name = "ParallelFinish"; break; + case WAIT_EVENT_PARALLEL_INDEX_SCAN: + event_name = "ParallelIndexScan"; + break; case WAIT_EVENT_SAFE_SNAPSHOT: event_name = "SafeSnapshot"; break; diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c index 4973396..9385325 100644 --- a/src/backend/utils/adt/selfuncs.c +++ b/src/backend/utils/adt/selfuncs.c @@ -5143,8 +5143,8 @@ get_actual_variable_range(PlannerInfo *root, VariableStatData *vardata, * extreme value has been deleted; that case motivates not * using SnapshotAny here. */ - index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty, - 1, 0); + index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty, NULL, + 1, 0, false); index_rescan(index_scan, scankeys, 1, NULL, 0); /* Fetch first tuple in sortop's direction */ @@ -5175,8 +5175,8 @@ get_actual_variable_range(PlannerInfo *root, VariableStatData *vardata, /* If max is requested, and we didn't find the index is empty */ if (max && have_data) { - index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty, - 1, 0); + index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty, NULL, + 1, 0, false); index_rescan(index_scan, scankeys, 1, NULL, 0); /* Fetch first tuple in reverse direction */ diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h index 1036cca..e777678 100644 --- a/src/include/access/amapi.h +++ b/src/include/access/amapi.h @@ -108,6 +108,12 @@ typedef bool (*amproperty_function) (Oid index_oid, int attno, /* validate definition of an opclass for this AM */ typedef bool (*amvalidate_function) (Oid opclassoid); +/* estimate size of parallel scan descriptor */ +typedef Size (*amestimateparallelscan_function) (Size index_size); + +/* prepare for parallel index scan */ +typedef void (*aminitparallelscan_function) (void *target); + /* prepare for index scan */ typedef IndexScanDesc (*ambeginscan_function) (Relation indexRelation, int nkeys, @@ -120,6 +126,9 @@ typedef void (*amrescan_function) (IndexScanDesc scan, ScanKey orderbys, int norderbys); +/* (re)start parallel index scan */ +typedef void (*amparallelrescan_function) (IndexScanDesc scan); + /* next valid tuple */ typedef bool (*amgettuple_function) (IndexScanDesc scan, ScanDirection direction); @@ -189,8 +198,11 @@ typedef struct IndexAmRoutine amoptions_function amoptions; amproperty_function amproperty; /* can be NULL */ amvalidate_function amvalidate; + amestimateparallelscan_function amestimateparallelscan; /* can be NULL */ + aminitparallelscan_function aminitparallelscan; /* can be NULL */ ambeginscan_function ambeginscan; amrescan_function amrescan; + amparallelrescan_function amparallelrescan; /* can be NULL */ amgettuple_function amgettuple; /* can be NULL */ amgetbitmap_function amgetbitmap; /* can be NULL */ amendscan_function amendscan; diff --git a/src/include/access/genam.h b/src/include/access/genam.h index 81907d5..4410ea3 100644 --- a/src/include/access/genam.h +++ b/src/include/access/genam.h @@ -83,6 +83,8 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state); typedef struct IndexScanDescData *IndexScanDesc; typedef struct SysScanDescData *SysScanDesc; +typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc; + /* * Enumeration specifying the type of uniqueness check to perform in * index_insert(). @@ -131,16 +133,23 @@ extern bool index_insert(Relation indexRelation, Relation heapRelation, IndexUniqueCheck checkUnique); +extern Size index_parallelscan_estimate(Relation indexrel, Snapshot snapshot); +extern void index_parallelscan_initialize(Relation heaprel, Relation indexrel, Snapshot snapshot, ParallelIndexScanDesc target); +extern IndexScanDesc index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys, + int norderbys, ParallelIndexScanDesc pscan); + extern IndexScanDesc index_beginscan(Relation heapRelation, Relation indexRelation, Snapshot snapshot, - int nkeys, int norderbys); + ParallelIndexScanDesc pscan, + int nkeys, int norderbys, bool temp_snap); extern IndexScanDesc index_beginscan_bitmap(Relation indexRelation, Snapshot snapshot, int nkeys); extern void index_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys); +extern void index_parallelrescan(IndexScanDesc scan); extern void index_endscan(IndexScanDesc scan); extern void index_markpos(IndexScanDesc scan); extern void index_restrpos(IndexScanDesc scan); diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index c580f51..1c8aa8a 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -609,6 +609,8 @@ typedef struct BTScanOpaqueData ScanKey arrayKeyData; /* modified copy of scan->keyData */ int numArrayKeys; /* number of equality-type array keys (-1 if * there are any unsatisfiable array keys) */ + int arrayKeyCount; /* count indicating number of array scan keys + * processed */ BTArrayKeyInfo *arrayKeys; /* info about each equality-type array key */ MemoryContext arrayContext; /* scan-lifespan context for array data */ @@ -652,7 +654,25 @@ typedef BTScanOpaqueData *BTScanOpaque; #define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT) /* - * prototypes for functions in nbtree.c (external entry points for btree) + * Below flags are used to indicate the state of parallel scan. + * + * BTPARALLEL_NOT_INITIALIZED implies that the scan is not started + * + * BTPARALLEL_ADVANCING implies one of the worker or backend is advancing the + * scan to a new page; others must wait. + * + * BTPARALLEL_IDLE implies that no backend is advancing the scan; someone can + * start doing it + * + * BTPARALLEL_DONE implies that the scan is complete (including error exit) + */ +#define BTPARALLEL_NOT_INITIALIZED 0x01 +#define BTPARALLEL_ADVANCING 0x02 +#define BTPARALLEL_DONE 0x03 +#define BTPARALLEL_IDLE 0x04 +/* + * prototypes for functions in nbtree.c (external entry points for btree and + * functions to maintain state of parallel scan) */ extern Datum bthandler(PG_FUNCTION_ARGS); extern IndexBuildResult *btbuild(Relation heap, Relation index, @@ -662,10 +682,17 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull, ItemPointer ht_ctid, Relation heapRel, IndexUniqueCheck checkUnique); extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys); +extern Size btestimateparallelscan(Size size); +extern void btinitparallelscan(void *target); +extern BlockNumber _bt_parallel_seize(IndexScanDesc scan, bool *status); +extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber next_page); +extern void _bt_parallel_done(IndexScanDesc scan); +extern void _bt_parallel_advance_scan(IndexScanDesc scan); extern bool btgettuple(IndexScanDesc scan, ScanDirection dir); extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm); extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, ScanKey orderbys, int norderbys); +extern void btparallelrescan(IndexScanDesc scan); extern void btendscan(IndexScanDesc scan); extern void btmarkpos(IndexScanDesc scan); extern void btrestrpos(IndexScanDesc scan); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index de98dd6..ca843f9 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -21,6 +21,9 @@ #include "access/tupdesc.h" #include "storage/spin.h" +#define OffsetToPointer(base, offset)\ +((void *)((char *)base + offset)) + /* * Shared state for parallel heap scan. * @@ -126,8 +129,20 @@ typedef struct IndexScanDescData /* state data for traversing HOT chains in index_getnext */ bool xs_continue_hot; /* T if must keep walking HOT chain */ + bool xs_temp_snap; /* unregister snapshot at scan end? */ + ParallelIndexScanDesc parallel_scan; /* parallel index scan + * information */ } IndexScanDescData; +/* Generic structure for parallel scans */ +typedef struct ParallelIndexScanDescData +{ + Oid ps_relid; + Oid ps_indexid; + Size ps_offset; /* Offset in bytes of am specific structure */ + char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelIndexScanDescData; + /* Struct for heap-or-index scans of system tables */ typedef struct SysScanDescData { diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 282f8ae..f038f64 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -784,6 +784,7 @@ typedef enum WAIT_EVENT_MQ_RECEIVE, WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, + WAIT_EVENT_PARALLEL_INDEX_SCAN, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP } WaitEventIPC; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 993880d..0192810 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -161,6 +161,7 @@ BTPageOpaque BTPageOpaqueData BTPageStat BTPageState +BTParallelScanDesc BTScanOpaque BTScanOpaqueData BTScanPos @@ -1264,6 +1265,8 @@ OverrideSearchPath OverrideStackEntry PACE_HEADER PACL +ParallelIndexScanDesc +ParallelIndexScanDescData PATH PBOOL PCtxtHandle