diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 40f201b..8d4f9f7 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -77,61 +77,64 @@
The structure IndexAmRoutine is defined thus:
-typedef struct IndexAmRoutine
-{
- NodeTag type;
-
- /*
- * Total number of strategies (operators) by which we can traverse/search
- * this AM. Zero if AM does not have a fixed set of strategy assignments.
- */
- uint16 amstrategies;
- /* total number of support functions that this AM uses */
- uint16 amsupport;
- /* does AM support ORDER BY indexed column's value? */
- bool amcanorder;
- /* does AM support ORDER BY result of an operator on indexed column? */
- bool amcanorderbyop;
- /* does AM support backward scanning? */
- bool amcanbackward;
- /* does AM support UNIQUE indexes? */
- bool amcanunique;
- /* does AM support multi-column indexes? */
- bool amcanmulticol;
- /* does AM require scans to have a constraint on the first index column? */
- bool amoptionalkey;
- /* does AM handle ScalarArrayOpExpr quals? */
- bool amsearcharray;
- /* does AM handle IS NULL/IS NOT NULL quals? */
- bool amsearchnulls;
- /* can index storage data type differ from column data type? */
- bool amstorage;
- /* can an index of this type be clustered on? */
- bool amclusterable;
- /* does AM handle predicate locks? */
- bool ampredlocks;
- /* type of data stored in index, or InvalidOid if variable */
- Oid amkeytype;
-
- /* interface functions */
- ambuild_function ambuild;
- ambuildempty_function ambuildempty;
- aminsert_function aminsert;
- ambulkdelete_function ambulkdelete;
- amvacuumcleanup_function amvacuumcleanup;
- amcanreturn_function amcanreturn; /* can be NULL */
- amcostestimate_function amcostestimate;
- amoptions_function amoptions;
- amproperty_function amproperty; /* can be NULL */
- amvalidate_function amvalidate;
- ambeginscan_function ambeginscan;
- amrescan_function amrescan;
- amgettuple_function amgettuple; /* can be NULL */
- amgetbitmap_function amgetbitmap; /* can be NULL */
- amendscan_function amendscan;
- ammarkpos_function ammarkpos; /* can be NULL */
- amrestrpos_function amrestrpos; /* can be NULL */
-} IndexAmRoutine;
+ typedef struct IndexAmRoutine
+ {
+ NodeTag type;
+
+ /*
+ * Total number of strategies (operators) by which we can traverse/search
+ * this AM. Zero if AM does not have a fixed set of strategy assignments.
+ */
+ uint16 amstrategies;
+ /* total number of support functions that this AM uses */
+ uint16 amsupport;
+ /* does AM support ORDER BY indexed column's value? */
+ bool amcanorder;
+ /* does AM support ORDER BY result of an operator on indexed column? */
+ bool amcanorderbyop;
+ /* does AM support backward scanning? */
+ bool amcanbackward;
+ /* does AM support UNIQUE indexes? */
+ bool amcanunique;
+ /* does AM support multi-column indexes? */
+ bool amcanmulticol;
+ /* does AM require scans to have a constraint on the first index column? */
+ bool amoptionalkey;
+ /* does AM handle ScalarArrayOpExpr quals? */
+ bool amsearcharray;
+ /* does AM handle IS NULL/IS NOT NULL quals? */
+ bool amsearchnulls;
+ /* can index storage data type differ from column data type? */
+ bool amstorage;
+ /* can an index of this type be clustered on? */
+ bool amclusterable;
+ /* does AM handle predicate locks? */
+ bool ampredlocks;
+ /* type of data stored in index, or InvalidOid if variable */
+ Oid amkeytype;
+
+ /* interface functions */
+ ambuild_function ambuild;
+ ambuildempty_function ambuildempty;
+ aminsert_function aminsert;
+ ambulkdelete_function ambulkdelete;
+ amvacuumcleanup_function amvacuumcleanup;
+ amcanreturn_function amcanreturn; /* can be NULL */
+ amcostestimate_function amcostestimate;
+ amoptions_function amoptions;
+ amproperty_function amproperty; /* can be NULL */
+ amvalidate_function amvalidate;
+ amestimateparallelscan_function amestimateparallelscan; /* can be NULL */
+ aminitparallelscan_function aminitparallelscan; /* can be NULL */
+ ambeginscan_function ambeginscan;
+ amrescan_function amrescan;
+ amparallelrescan_function amparallelrescan; /* can be NULL */
+ amgettuple_function amgettuple; /* can be NULL */
+ amgetbitmap_function amgetbitmap; /* can be NULL */
+ amendscan_function amendscan;
+ ammarkpos_function ammarkpos; /* can be NULL */
+ amrestrpos_function amrestrpos; /* can be NULL */
+ } IndexAmRoutine;
@@ -459,6 +462,36 @@ amvalidate (Oid opclassoid);
invalid. Problems should be reported with ereport> messages.
+
+
+Size
+amestimateparallelscan (Size index_size);
+
+ Estimate and return the storage space required for parallel index scan.
+ The index_size> parameter indicate the size of generic parallel
+ index scan information. The size of index-type-specific parallel information
+ will be added to index_size> and result will be returned back to
+ caller.
+
+
+
+
+void
+aminitparallelscan (void *target);
+
+ Initialize the parallel index scan state. It will be used to initialize
+ index-type-specific parallel information which will be stored immediatedly
+ after generic parallel information required for parallel index scans. The
+ required state information will be set in target>.
+
+
+
+ The aminitparallelscan> and amestimateparallelscan>
+ functions need only be provided if the access method supports parallel>
+ index scans. If it doesn't, the aminitparallelscan> and
+ amestimateparallelscan> fields in its IndexAmRoutine>
+ struct must be set to NULL.
+
The purpose of an index, of course, is to support scans for tuples matching
@@ -511,6 +544,23 @@ amrescan (IndexScanDesc scan,
+void
+amparallelrescan (IndexScanDesc scan);
+
+ Restart the parallel index scan. It resets the parallel index scan state.
+ It must be called only during restart of scan which will be typically
+ required for the inner side of nest-loop join.
+
+
+
+ The amparallelrescan> function need only be provided if the
+ access method supports parallel> index scans. If it doesn't,
+ the amparallelrescan> field in its IndexAmRoutine>
+ struct must be set to NULL.
+
+
+
+
boolean
amgettuple (IndexScanDesc scan,
ScanDirection direction);
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 1545f03..42ef91f 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1199,7 +1199,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting in an extension.
- IPC>
+ IPC>
BgWorkerShutdown>
Waiting for background worker to shut down.
@@ -1232,6 +1232,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for parallel workers to finish computing.
+ ParallelIndexScan>
+ Waiting for next block to be available for parallel index scan.
+
+
SafeSnapshot>
Waiting for a snapshot for a READ ONLY DEFERRABLE> transaction.
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 1b45a4c..5ff6a0f 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -104,8 +104,11 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine->amoptions = brinoptions;
amroutine->amproperty = NULL;
amroutine->amvalidate = brinvalidate;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
amroutine->ambeginscan = brinbeginscan;
amroutine->amrescan = brinrescan;
+ amroutine->amparallelrescan = NULL;
amroutine->amgettuple = NULL;
amroutine->amgetbitmap = bringetbitmap;
amroutine->amendscan = brinendscan;
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index f07eedc..6f7024e 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -61,8 +61,11 @@ ginhandler(PG_FUNCTION_ARGS)
amroutine->amoptions = ginoptions;
amroutine->amproperty = NULL;
amroutine->amvalidate = ginvalidate;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
amroutine->ambeginscan = ginbeginscan;
amroutine->amrescan = ginrescan;
+ amroutine->amparallelrescan = NULL;
amroutine->amgettuple = NULL;
amroutine->amgetbitmap = gingetbitmap;
amroutine->amendscan = ginendscan;
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index b8aa9bc..5727ef9 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -81,8 +81,11 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amoptions = gistoptions;
amroutine->amproperty = gistproperty;
amroutine->amvalidate = gistvalidate;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
amroutine->ambeginscan = gistbeginscan;
amroutine->amrescan = gistrescan;
+ amroutine->amparallelrescan = NULL;
amroutine->amgettuple = gistgettuple;
amroutine->amgetbitmap = gistgetbitmap;
amroutine->amendscan = gistendscan;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 1fa087a..9e5740b 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -78,8 +78,11 @@ hashhandler(PG_FUNCTION_ARGS)
amroutine->amoptions = hashoptions;
amroutine->amproperty = NULL;
amroutine->amvalidate = hashvalidate;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
amroutine->ambeginscan = hashbeginscan;
amroutine->amrescan = hashrescan;
+ amroutine->amparallelrescan = NULL;
amroutine->amgettuple = hashgettuple;
amroutine->amgetbitmap = hashgetbitmap;
amroutine->amendscan = hashendscan;
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 65c941d..84ebd72 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -375,7 +375,7 @@ systable_beginscan(Relation heapRelation,
}
sysscan->iscan = index_beginscan(heapRelation, irel,
- snapshot, nkeys, 0);
+ snapshot, NULL, nkeys, 0, false);
index_rescan(sysscan->iscan, key, nkeys, NULL, 0);
sysscan->scan = NULL;
}
@@ -577,7 +577,7 @@ systable_beginscan_ordered(Relation heapRelation,
}
sysscan->iscan = index_beginscan(heapRelation, indexRelation,
- snapshot, nkeys, 0);
+ snapshot, NULL, nkeys, 0, false);
index_rescan(sysscan->iscan, key, nkeys, NULL, 0);
sysscan->scan = NULL;
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 54b71cb..72e1b03 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -207,6 +207,70 @@ index_insert(Relation indexRelation,
}
/*
+ * index_parallelscan_estimate - estimate storage for ParallelIndexScanDesc
+ *
+ * It calls am specific routine to obtain size of am specific shared
+ * information.
+ */
+Size
+index_parallelscan_estimate(Relation indexrel, Snapshot snapshot)
+{
+ Size index_size = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+
+ /* amestimateparallelscan is optional; assume no-op if not provided by AM */
+ if (indexrel->rd_amroutine->amestimateparallelscan == NULL)
+ return index_size;
+ else
+ return indexrel->rd_amroutine->amestimateparallelscan(index_size);
+}
+
+/*
+ * index_parallelscan_initialize - initialize ParallelIndexScanDesc
+ *
+ * It calls access method specific initialization routine to initialize am
+ * specific information. Call this just once in the leader process; then,
+ * individual workers attach via index_beginscan_parallel.
+ */
+void
+index_parallelscan_initialize(Relation heaprel, Relation indexrel, Snapshot snapshot, ParallelIndexScanDesc target)
+{
+ Size offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+ void *amtarget = (char *) ((void *) target) + offset;
+
+ target->ps_relid = RelationGetRelid(heaprel);
+ target->ps_indexid = RelationGetRelid(indexrel);
+ target->ps_offset = offset;
+ SerializeSnapshot(snapshot, target->ps_snapshot_data);
+
+ /* aminitparallelscan is optional; assume no-op if not provided by AM */
+ if (indexrel->rd_amroutine->aminitparallelscan == NULL)
+ return;
+
+ indexrel->rd_amroutine->aminitparallelscan(amtarget);
+}
+
+/*
+ * index_beginscan_parallel - join parallel index scan
+ *
+ * Caller must be holding suitable locks on the heap and the index.
+ */
+IndexScanDesc
+index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
+ int norderbys, ParallelIndexScanDesc pscan)
+{
+ Snapshot snapshot;
+ IndexScanDesc scan;
+
+ Assert(RelationGetRelid(heaprel) == pscan->ps_relid);
+ snapshot = RestoreSnapshot(pscan->ps_snapshot_data);
+ RegisterSnapshot(snapshot);
+ scan = index_beginscan(heaprel, indexrel, snapshot, pscan, nkeys, norderbys, true);
+ return scan;
+}
+
+/*
* index_beginscan - start a scan of an index with amgettuple
*
* Caller must be holding suitable locks on the heap and the index.
@@ -214,8 +278,8 @@ index_insert(Relation indexRelation,
IndexScanDesc
index_beginscan(Relation heapRelation,
Relation indexRelation,
- Snapshot snapshot,
- int nkeys, int norderbys)
+ Snapshot snapshot, ParallelIndexScanDesc pscan,
+ int nkeys, int norderbys, bool temp_snap)
{
IndexScanDesc scan;
@@ -227,6 +291,8 @@ index_beginscan(Relation heapRelation,
*/
scan->heapRelation = heapRelation;
scan->xs_snapshot = snapshot;
+ scan->parallel_scan = pscan;
+ scan->xs_temp_snap = temp_snap;
return scan;
}
@@ -251,6 +317,8 @@ index_beginscan_bitmap(Relation indexRelation,
* up by RelationGetIndexScan.
*/
scan->xs_snapshot = snapshot;
+ scan->parallel_scan = NULL;
+ scan->xs_temp_snap = false;
return scan;
}
@@ -319,6 +387,22 @@ index_rescan(IndexScanDesc scan,
}
/* ----------------
+ * index_parallelrescan - (re)start a parallel scan of an index
+ * ----------------
+ */
+void
+index_parallelrescan(IndexScanDesc scan)
+{
+ SCAN_CHECKS;
+
+ /* amparallelrescan is optional; assume no-op if not provided by AM */
+ if (scan->indexRelation->rd_amroutine->amparallelrescan == NULL)
+ return;
+
+ scan->indexRelation->rd_amroutine->amparallelrescan(scan);
+}
+
+/* ----------------
* index_endscan - end a scan
* ----------------
*/
@@ -341,6 +425,9 @@ index_endscan(IndexScanDesc scan)
/* Release index refcount acquired by index_beginscan */
RelationDecrementReferenceCount(scan->indexRelation);
+ if (scan->xs_temp_snap)
+ UnregisterSnapshot(scan->xs_snapshot);
+
/* Release the scan data structure itself */
IndexScanEnd(scan);
}
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index a264b92..e749a63 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -23,6 +23,8 @@
#include "access/xlog.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
+#include "pgstat.h"
+#include "storage/condition_variable.h"
#include "storage/indexfsm.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -62,6 +64,25 @@ typedef struct
MemoryContext pagedelcontext;
} BTVacState;
+/*
+ * BTParallelScanDescData contains btree specific shared information required
+ * for parallel scan.
+ */
+typedef struct BTParallelScanDescData
+{
+ BlockNumber ps_nextPage; /* latest or next page to be scanned */
+ uint8 ps_pageStatus; /* indicates whether next page is available
+ * for scan. see nbtree.h for possible states
+ * of parallel scan. */
+ int ps_arrayKeyCount; /* count indicating number of array
+ * scan keys processed by parallel
+ * scan */
+ slock_t ps_mutex; /* protects above variables */
+ ConditionVariable cv; /* used to synchronize parallel scan */
+} BTParallelScanDescData;
+
+typedef struct BTParallelScanDescData *BTParallelScanDesc;
+
static void btbuildCallback(Relation index,
HeapTuple htup,
@@ -110,8 +131,11 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amoptions = btoptions;
amroutine->amproperty = btproperty;
amroutine->amvalidate = btvalidate;
+ amroutine->amestimateparallelscan = btestimateparallelscan;
+ amroutine->aminitparallelscan = btinitparallelscan;
amroutine->ambeginscan = btbeginscan;
amroutine->amrescan = btrescan;
+ amroutine->amparallelrescan = btparallelrescan;
amroutine->amgettuple = btgettuple;
amroutine->amgetbitmap = btgetbitmap;
amroutine->amendscan = btendscan;
@@ -467,6 +491,175 @@ btbeginscan(Relation rel, int nkeys, int norderbys)
}
/*
+ * btestimateparallelscan - estimate storage for BTParallelScanDescData
+ */
+Size
+btestimateparallelscan(Size index_size)
+{
+ return add_size(index_size, sizeof(BTParallelScanDescData));
+}
+
+/*
+ * btinitparallelscan - Initializing BTParallelScanDesc for parallel btree scan
+ */
+void
+btinitparallelscan(void *target)
+{
+ BTParallelScanDesc bt_target = (BTParallelScanDesc) target;
+
+ SpinLockInit(&bt_target->ps_mutex);
+ bt_target->ps_nextPage = InvalidBlockNumber;
+ bt_target->ps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ bt_target->ps_arrayKeyCount = 0;
+ ConditionVariableInit(&bt_target->cv);
+}
+
+/*
+ * _bt_parallel_seize() -- returns the next block to be scanned for forward
+ * scans and latest block scanned for backward scans.
+ *
+ * status - True indicates that the block number returned is valid and scan
+ * is continued or block number is invalid and scan has just begun
+ * or block number is P_NONE and scan is finished. False indicates
+ * that we have reached the end of scan for current scankeys and for
+ * that we return block number as P_NONE.
+ *
+ * The first time master backend or worker hits last page, it will return
* P_NONE and status as 'True', after that any worker tries to fetch next
+ * page, it will return status as 'False'.
+ *
+ * Callers ignore the return value, if the status is false.
+ */
+BlockNumber
+_bt_parallel_seize(IndexScanDesc scan, bool *status)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ uint8 pageStatus;
+ bool exit_loop = false;
+ BlockNumber nextPage = InvalidBlockNumber;
+ BTParallelScanDesc btscan = (BTParallelScanDesc) OffsetToPointer(
+ (void *) scan->parallel_scan,
+ scan->parallel_scan->ps_offset);
+
+ *status = true;
+ while (1)
+ {
+ SpinLockAcquire(&btscan->ps_mutex);
+ pageStatus = btscan->ps_pageStatus;
+ if (so->arrayKeyCount < btscan->ps_arrayKeyCount)
+ *status = false;
+ else if (pageStatus == BTPARALLEL_DONE)
+ *status = false;
+ else if (pageStatus != BTPARALLEL_ADVANCING)
+ {
+ btscan->ps_pageStatus = BTPARALLEL_ADVANCING;
+ nextPage = btscan->ps_nextPage;
+ exit_loop = true;
+ }
+ SpinLockRelease(&btscan->ps_mutex);
+ if (exit_loop || *status == false)
+ break;
+ ConditionVariableSleep(&btscan->cv, WAIT_EVENT_PARALLEL_INDEX_SCAN);
+ }
+ ConditionVariableCancelSleep();
+
+ /* no more pages to scan */
+ if (*status == false)
+ return P_NONE;
+
+ *status = true;
+ return nextPage;
+}
+
+/*
+ * _bt_parallel_release() -- Advances the parallel scan to allow scan of next
+ * page
+ *
+ * It updates the value of next page that allows parallel scan to move forward
+ * or backward depending on scan direction. It wakes up one of the sleeping
+ * workers.
+ *
+ * For backward scan, next_page holds the latest page being scanned.
+ * For forward scan, next_page holds the next page to be scanned.
+ */
+void
+_bt_parallel_release(IndexScanDesc scan, BlockNumber next_page)
+{
+ BTParallelScanDesc btscan = (BTParallelScanDesc) OffsetToPointer(
+ (void *) scan->parallel_scan,
+ scan->parallel_scan->ps_offset);
+
+ SpinLockAcquire(&btscan->ps_mutex);
+ btscan->ps_nextPage = next_page;
+ btscan->ps_pageStatus = BTPARALLEL_IDLE;
+ SpinLockRelease(&btscan->ps_mutex);
+ ConditionVariableSignal(&btscan->cv);
+}
+
+/*
+ * _bt_parallel_done() -- Finishes the parallel scan
+ *
+ * This must be called when there are no pages left to scan. Notify end of
+ * parallel scan to all the other workers associated with this scan.
+ */
+void
+_bt_parallel_done(IndexScanDesc scan)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ BTParallelScanDesc btscan;
+ bool status_changed = false;
+
+ /* Do nothing, for non-parallel scans */
+ if (scan->parallel_scan == NULL)
+ return;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer((void *) scan->parallel_scan,
+ scan->parallel_scan->ps_offset);
+
+ /*
+ * Ensure to mark parallel scan as done no more than once for single scan.
+ * We rely on this state to initiate the next scan for multiple array
+ * keys, see _bt_advance_array_keys.
+ */
+ SpinLockAcquire(&btscan->ps_mutex);
+ if (so->arrayKeyCount >= btscan->ps_arrayKeyCount &&
+ btscan->ps_pageStatus != BTPARALLEL_DONE)
+ {
+ btscan->ps_pageStatus = BTPARALLEL_DONE;
+ status_changed = true;
+ }
+ SpinLockRelease(&btscan->ps_mutex);
+
+ /* wake up all the workers associated with this parallel scan */
+ if (status_changed)
+ ConditionVariableBroadcast(&btscan->cv);
+}
+
+/*
+ * _bt_parallel_advance_scan() -- Advances the parallel scan
+ *
+ * It updates the count of array keys processed for both local and parallel
+ * scans.
+ */
+void
+_bt_parallel_advance_scan(IndexScanDesc scan)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ BTParallelScanDesc btscan = (BTParallelScanDesc) OffsetToPointer(
+ (void *) scan->parallel_scan,
+ scan->parallel_scan->ps_offset);
+
+ so->arrayKeyCount++;
+ SpinLockAcquire(&btscan->ps_mutex);
+ if (btscan->ps_pageStatus == BTPARALLEL_DONE)
+ {
+ btscan->ps_nextPage = InvalidBlockNumber;
+ btscan->ps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ btscan->ps_arrayKeyCount++;
+ }
+ SpinLockRelease(&btscan->ps_mutex);
+}
+
+/*
* btrescan() -- rescan an index relation
*/
void
@@ -486,6 +679,7 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
}
so->markItemIndex = -1;
+ so->arrayKeyCount = 0;
BTScanPosUnpinIfPinned(so->markPos);
BTScanPosInvalidate(so->markPos);
@@ -526,6 +720,31 @@ btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
}
/*
+ * btparallelrescan() -- reset parallel scan
+ */
+void
+btparallelrescan(IndexScanDesc scan)
+{
+ if (scan->parallel_scan)
+ {
+ BTParallelScanDesc btscan;
+
+ btscan = (BTParallelScanDesc) OffsetToPointer(
+ (void *) scan->parallel_scan,
+ scan->parallel_scan->ps_offset);
+
+ /*
+ * Ideally, we don't need to acquire spinlock here, but being
+ * consistent with heap_rescan seems to be a good idea.
+ */
+ SpinLockAcquire(&btscan->ps_mutex);
+ btscan->ps_nextPage = InvalidBlockNumber;
+ btscan->ps_pageStatus = BTPARALLEL_NOT_INITIALIZED;
+ SpinLockRelease(&btscan->ps_mutex);
+ }
+}
+
+/*
* btendscan() -- close down a scan
*/
void
diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c
index ee46023..d9e0a5f 100644
--- a/src/backend/access/nbtree/nbtsearch.c
+++ b/src/backend/access/nbtree/nbtsearch.c
@@ -30,9 +30,12 @@ static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
static void _bt_saveitem(BTScanOpaque so, int itemIndex,
OffsetNumber offnum, IndexTuple itup);
static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
+static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
+static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
+static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir);
/*
@@ -544,8 +547,10 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
ScanKeyData notnullkeys[INDEX_MAX_KEYS];
int keysCount = 0;
int i;
+ bool status = true;
StrategyNumber strat_total;
BTScanPosItem *currItem;
+ BlockNumber blkno;
Assert(!BTScanPosIsValid(so->currPos));
@@ -564,6 +569,38 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
if (!so->qual_ok)
return false;
+ /*
+ * For parallel scans, get the page from shared state. If scan has not
+ * started, proceed to find out first leaf page to scan by keeping other
+ * workers waiting until we have descended to appropriate leaf page to be
+ * scanned for matching tuples.
+ *
+ * If the scan has already begun, skip finding the first leaf page and
+ * directly scanning the page stored in shared structure or the page to
+ * its left in case of backward scan.
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ blkno = _bt_parallel_seize(scan, &status);
+ if (status == false)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ else if (blkno == P_NONE)
+ {
+ _bt_parallel_done(scan);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ else if (blkno != InvalidBlockNumber)
+ {
+ if (!_bt_parallel_readpage(scan, blkno, dir))
+ return false;
+ goto readcomplete;
+ }
+ }
+
/*----------
* Examine the scan keys to discover where we need to start the scan.
*
@@ -743,7 +780,19 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* there.
*/
if (keysCount == 0)
- return _bt_endpoint(scan, dir);
+ {
+ bool match;
+
+ match = _bt_endpoint(scan, dir);
+ if (!match)
+ {
+ /* No match , indicate (parallel) scan finished */
+ _bt_parallel_done(scan);
+ BTScanPosInvalidate(so->currPos);
+ }
+
+ return match;
+ }
/*
* We want to start the scan somewhere within the index. Set up an
@@ -993,25 +1042,21 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
* because nothing finer to lock exists.
*/
PredicateLockRelation(rel, scan->xs_snapshot);
+
+ /*
+ * mark parallel scan as done, so that all the workers can finish
+ * their scan
+ */
+ _bt_parallel_done(scan);
+ BTScanPosInvalidate(so->currPos);
+
return false;
}
else
PredicateLockPage(rel, BufferGetBlockNumber(buf),
scan->xs_snapshot);
- /* initialize moreLeft/moreRight appropriately for scan direction */
- if (ScanDirectionIsForward(dir))
- {
- so->currPos.moreLeft = false;
- so->currPos.moreRight = true;
- }
- else
- {
- so->currPos.moreLeft = true;
- so->currPos.moreRight = false;
- }
- so->numKilled = 0; /* just paranoia */
- Assert(so->markItemIndex == -1);
+ _bt_initialize_more_data(so, dir);
/* position to the precise item on the page */
offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
@@ -1060,6 +1105,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
_bt_drop_lock_and_maybe_pin(scan, &so->currPos);
}
+readcomplete:
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
@@ -1154,6 +1200,16 @@ _bt_readpage(IndexScanDesc scan, ScanDirection dir, OffsetNumber offnum)
page = BufferGetPage(so->currPos.buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+ /* allow next page be processed by parallel worker */
+ if (scan->parallel_scan)
+ {
+ if (ScanDirectionIsForward(dir))
+ _bt_parallel_release(scan, opaque->btpo_next);
+ else
+ _bt_parallel_release(scan, BufferGetBlockNumber(so->currPos.buf));
+ }
+
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
@@ -1278,21 +1334,16 @@ _bt_saveitem(BTScanOpaque so, int itemIndex,
* if pinned, we'll drop the pin before moving to next page. The buffer is
* not locked on entry.
*
- * On success exit, so->currPos is updated to contain data from the next
- * interesting page. For success on a scan using a non-MVCC snapshot we hold
- * a pin, but not a read lock, on that page. If we do not hold the pin, we
- * set so->currPos.buf to InvalidBuffer. We return TRUE to indicate success.
- *
- * If there are no more matching records in the given direction, we drop all
- * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ * For success on a scan using a non-MVCC snapshot we hold a pin, but not a
+ * read lock, on that page. If we do not hold the pin, we set so->currPos.buf
+ * to InvalidBuffer. We return TRUE to indicate success.
*/
static bool
_bt_steppage(IndexScanDesc scan, ScanDirection dir)
{
BTScanOpaque so = (BTScanOpaque) scan->opaque;
- Relation rel;
- Page page;
- BTPageOpaque opaque;
+ BlockNumber blkno = InvalidBlockNumber;
+ bool status = true;
Assert(BTScanPosIsValid(so->currPos));
@@ -1319,13 +1370,27 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
so->markItemIndex = -1;
}
- rel = scan->indexRelation;
-
if (ScanDirectionIsForward(dir))
{
/* Walk right to the next page with data */
- /* We must rely on the previously saved nextPage link! */
- BlockNumber blkno = so->currPos.nextPage;
+
+ /*
+ * We must rely on the previously saved nextPage link for non-parallel
+ * scans!
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ blkno = _bt_parallel_seize(scan, &status);
+ if (status == false)
+ {
+ /* release the previous buffer, if pinned */
+ BTScanPosUnpinIfPinned(so->currPos);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+ else
+ blkno = so->currPos.nextPage;
/* Remember we left a page with data */
so->currPos.moreLeft = true;
@@ -1333,11 +1398,68 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
/* release the previous buffer, if pinned */
BTScanPosUnpinIfPinned(so->currPos);
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+ }
+ else
+ {
+ /* Remember we left a page with data */
+ so->currPos.moreRight = true;
+
+ /* For parallel scans, get the last page scanned */
+ if (scan->parallel_scan != NULL)
+ {
+ blkno = _bt_parallel_seize(scan, &status);
+ BTScanPosUnpinIfPinned(so->currPos);
+ if (status == false)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+ }
+
+ /* Drop the lock, and maybe the pin, on the current page */
+ _bt_drop_lock_and_maybe_pin(scan, &so->currPos);
+
+ return true;
+}
+
+/*
+ * _bt_readnextpage() -- Read next page containing valid data for scan
+ *
+ * On success exit, so->currPos is updated to contain data from the next
+ * interesting page. Caller is responsible to release lock and pin on
+ * buffer on success. We return TRUE to indicate success.
+ *
+ * If there are no more matching records in the given direction, we drop all
+ * locks and pins, set so->currPos.buf to InvalidBuffer, and return FALSE.
+ */
+static bool
+_bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+ Relation rel;
+ Page page;
+ BTPageOpaque opaque;
+ bool status = true;
+
+ rel = scan->indexRelation;
+
+ if (ScanDirectionIsForward(dir))
+ {
for (;;)
{
- /* if we're at end of scan, give up */
+ /*
+ * if we're at end of scan, give up and mark parallel scan as
+ * done, so that all the workers can finish their scan
+ */
if (blkno == P_NONE || !so->currPos.moreRight)
{
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
@@ -1345,10 +1467,10 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
CHECK_FOR_INTERRUPTS();
/* step right one page */
so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
- /* check for deleted page */
page = BufferGetPage(so->currPos.buf);
TestForOldSnapshot(scan->xs_snapshot, rel, page);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+ /* check for deleted page */
if (!P_IGNORE(opaque))
{
PredicateLockPage(rel, blkno, scan->xs_snapshot);
@@ -1359,14 +1481,30 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
}
/* nope, keep going */
- blkno = opaque->btpo_next;
+ if (scan->parallel_scan != NULL)
+ {
+ blkno = _bt_parallel_seize(scan, &status);
+ if (status == false)
+ {
+ _bt_relbuf(rel, so->currPos.buf);
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ }
+ else
+ blkno = opaque->btpo_next;
_bt_relbuf(rel, so->currPos.buf);
}
}
else
{
- /* Remember we left a page with data */
- so->currPos.moreRight = true;
+ /*
+ * for parallel scans, current block number needs to be retrieved from
+ * shared state and it is the responsibility of caller to pass the
+ * correct block number.
+ */
+ if (blkno != InvalidBlockNumber)
+ so->currPos.currPage = blkno;
/*
* Walk left to the next page with data. This is much more complex
@@ -1401,6 +1539,12 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
if (!so->currPos.moreLeft)
{
_bt_relbuf(rel, so->currPos.buf);
+
+ /*
+ * mark parallel scan as done, so that all the workers can
+ * finish their scan
+ */
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
@@ -1412,6 +1556,7 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
/* if we're physically at end of index, return failure */
if (so->currPos.buf == InvalidBuffer)
{
+ _bt_parallel_done(scan);
BTScanPosInvalidate(so->currPos);
return false;
}
@@ -1432,9 +1577,48 @@ _bt_steppage(IndexScanDesc scan, ScanDirection dir)
if (_bt_readpage(scan, dir, PageGetMaxOffsetNumber(page)))
break;
}
+
+ /*
+ * For parallel scans, get the last page scanned as it is quite
+ * possible that by the time we try to fetch previous page, other
+ * worker has also decided to scan that previous page. We could
+ * avoid that by doing _bt_parallel_release once we have read the
+ * current page, but it is bad to make other workers wait till we
+ * read the page.
+ */
+ if (scan->parallel_scan != NULL)
+ {
+ _bt_relbuf(rel, so->currPos.buf);
+ blkno = _bt_parallel_seize(scan, &status);
+ if (status == false)
+ {
+ BTScanPosInvalidate(so->currPos);
+ return false;
+ }
+ so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
+ }
}
}
+ return true;
+}
+
+/*
+ * _bt_parallel_readpage() -- Read current page containing valid data for scan
+ *
+ * On success, release lock and pin on buffer. We return TRUE to indicate
+ * success.
+ */
+static bool
+_bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
+{
+ BTScanOpaque so = (BTScanOpaque) scan->opaque;
+
+ _bt_initialize_more_data(so, dir);
+
+ if (!_bt_readnextpage(scan, blkno, dir))
+ return false;
+
/* Drop the lock, and maybe the pin, on the current page */
_bt_drop_lock_and_maybe_pin(scan, &so->currPos);
@@ -1712,19 +1896,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
/* remember which buffer we have pinned */
so->currPos.buf = buf;
- /* initialize moreLeft/moreRight appropriately for scan direction */
- if (ScanDirectionIsForward(dir))
- {
- so->currPos.moreLeft = false;
- so->currPos.moreRight = true;
- }
- else
- {
- so->currPos.moreLeft = true;
- so->currPos.moreRight = false;
- }
- so->numKilled = 0; /* just paranoia */
- so->markItemIndex = -1; /* ditto */
+ _bt_initialize_more_data(so, dir);
/*
* Now load data from the first page of the scan.
@@ -1753,3 +1925,25 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
return true;
}
+
+/*
+ * _bt_initialize_more_data() -- initialize moreLeft/moreRight appropriately
+ * for scan direction
+ */
+static inline void
+_bt_initialize_more_data(BTScanOpaque so, ScanDirection dir)
+{
+ /* initialize moreLeft/moreRight appropriately for scan direction */
+ if (ScanDirectionIsForward(dir))
+ {
+ so->currPos.moreLeft = false;
+ so->currPos.moreRight = true;
+ }
+ else
+ {
+ so->currPos.moreLeft = true;
+ so->currPos.moreRight = false;
+ }
+ so->numKilled = 0; /* just paranoia */
+ so->markItemIndex = -1; /* ditto */
+}
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 063c988..15ea6df 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -590,6 +590,10 @@ _bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir)
break;
}
+ /* advance parallel scan */
+ if (scan->parallel_scan != NULL)
+ _bt_parallel_advance_scan(scan);
+
return found;
}
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index d570ae5..4f39e93 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -60,8 +60,11 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amoptions = spgoptions;
amroutine->amproperty = NULL;
amroutine->amvalidate = spgvalidate;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
amroutine->ambeginscan = spgbeginscan;
amroutine->amrescan = spgrescan;
+ amroutine->amparallelrescan = NULL;
amroutine->amgettuple = spggettuple;
amroutine->amgetbitmap = spggetbitmap;
amroutine->amendscan = spgendscan;
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 2131226..02ca783 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -903,7 +903,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
if (OldIndex != NULL && !use_sort)
{
heapScan = NULL;
- indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, 0, 0);
+ indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, NULL, 0, 0, false);
index_rescan(indexScan, NULL, 0, NULL, 0);
}
else
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 009c1b7..f8e662b 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -722,7 +722,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
retry:
conflict = false;
found_self = false;
- index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0);
+ index_scan = index_beginscan(heap, index, &DirtySnapshot, NULL, index_natts, 0, false);
index_rescan(index_scan, scankeys, index_natts, NULL, 0);
while ((tup = index_getnext(index_scan,
diff --git a/src/backend/executor/nodeBitmapIndexscan.c b/src/backend/executor/nodeBitmapIndexscan.c
index a364098..43c610c 100644
--- a/src/backend/executor/nodeBitmapIndexscan.c
+++ b/src/backend/executor/nodeBitmapIndexscan.c
@@ -26,6 +26,7 @@
#include "executor/nodeIndexscan.h"
#include "miscadmin.h"
#include "utils/memutils.h"
+#include "access/relscan.h"
/* ----------------------------------------------------------------
@@ -48,6 +49,7 @@ MultiExecBitmapIndexScan(BitmapIndexScanState *node)
* extract necessary information from index scan node
*/
scandesc = node->biss_ScanDesc;
+ scandesc->parallel_scan = NULL;
/*
* If we have runtime keys and they've not already been set up, do it now.
diff --git a/src/backend/executor/nodeIndexonlyscan.c b/src/backend/executor/nodeIndexonlyscan.c
index 4f6f91c..45566bd 100644
--- a/src/backend/executor/nodeIndexonlyscan.c
+++ b/src/backend/executor/nodeIndexonlyscan.c
@@ -540,9 +540,9 @@ ExecInitIndexOnlyScan(IndexOnlyScan *node, EState *estate, int eflags)
*/
indexstate->ioss_ScanDesc = index_beginscan(currentRelation,
indexstate->ioss_RelationDesc,
- estate->es_snapshot,
+ estate->es_snapshot, NULL,
indexstate->ioss_NumScanKeys,
- indexstate->ioss_NumOrderByKeys);
+ indexstate->ioss_NumOrderByKeys, false);
/* Set it up for index-only scan */
indexstate->ioss_ScanDesc->xs_want_itup = true;
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 3143bd9..77d2990 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -1022,9 +1022,9 @@ ExecInitIndexScan(IndexScan *node, EState *estate, int eflags)
*/
indexstate->iss_ScanDesc = index_beginscan(currentRelation,
indexstate->iss_RelationDesc,
- estate->es_snapshot,
+ estate->es_snapshot, NULL,
indexstate->iss_NumScanKeys,
- indexstate->iss_NumOrderByKeys);
+ indexstate->iss_NumOrderByKeys, false);
/*
* If no run-time keys to calculate, go ahead and pass the scankeys to the
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 61e6a2c..1bbeec6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3386,6 +3386,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PARALLEL_FINISH:
event_name = "ParallelFinish";
break;
+ case WAIT_EVENT_PARALLEL_INDEX_SCAN:
+ event_name = "ParallelIndexScan";
+ break;
case WAIT_EVENT_SAFE_SNAPSHOT:
event_name = "SafeSnapshot";
break;
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index 4973396..9385325 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -5143,8 +5143,8 @@ get_actual_variable_range(PlannerInfo *root, VariableStatData *vardata,
* extreme value has been deleted; that case motivates not
* using SnapshotAny here.
*/
- index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty,
- 1, 0);
+ index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty, NULL,
+ 1, 0, false);
index_rescan(index_scan, scankeys, 1, NULL, 0);
/* Fetch first tuple in sortop's direction */
@@ -5175,8 +5175,8 @@ get_actual_variable_range(PlannerInfo *root, VariableStatData *vardata,
/* If max is requested, and we didn't find the index is empty */
if (max && have_data)
{
- index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty,
- 1, 0);
+ index_scan = index_beginscan(heapRel, indexRel, &SnapshotDirty, NULL,
+ 1, 0, false);
index_rescan(index_scan, scankeys, 1, NULL, 0);
/* Fetch first tuple in reverse direction */
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 1036cca..e777678 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -108,6 +108,12 @@ typedef bool (*amproperty_function) (Oid index_oid, int attno,
/* validate definition of an opclass for this AM */
typedef bool (*amvalidate_function) (Oid opclassoid);
+/* estimate size of parallel scan descriptor */
+typedef Size (*amestimateparallelscan_function) (Size index_size);
+
+/* prepare for parallel index scan */
+typedef void (*aminitparallelscan_function) (void *target);
+
/* prepare for index scan */
typedef IndexScanDesc (*ambeginscan_function) (Relation indexRelation,
int nkeys,
@@ -120,6 +126,9 @@ typedef void (*amrescan_function) (IndexScanDesc scan,
ScanKey orderbys,
int norderbys);
+/* (re)start parallel index scan */
+typedef void (*amparallelrescan_function) (IndexScanDesc scan);
+
/* next valid tuple */
typedef bool (*amgettuple_function) (IndexScanDesc scan,
ScanDirection direction);
@@ -189,8 +198,11 @@ typedef struct IndexAmRoutine
amoptions_function amoptions;
amproperty_function amproperty; /* can be NULL */
amvalidate_function amvalidate;
+ amestimateparallelscan_function amestimateparallelscan; /* can be NULL */
+ aminitparallelscan_function aminitparallelscan; /* can be NULL */
ambeginscan_function ambeginscan;
amrescan_function amrescan;
+ amparallelrescan_function amparallelrescan; /* can be NULL */
amgettuple_function amgettuple; /* can be NULL */
amgetbitmap_function amgetbitmap; /* can be NULL */
amendscan_function amendscan;
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index 81907d5..4410ea3 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -83,6 +83,8 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state);
typedef struct IndexScanDescData *IndexScanDesc;
typedef struct SysScanDescData *SysScanDesc;
+typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc;
+
/*
* Enumeration specifying the type of uniqueness check to perform in
* index_insert().
@@ -131,16 +133,23 @@ extern bool index_insert(Relation indexRelation,
Relation heapRelation,
IndexUniqueCheck checkUnique);
+extern Size index_parallelscan_estimate(Relation indexrel, Snapshot snapshot);
+extern void index_parallelscan_initialize(Relation heaprel, Relation indexrel, Snapshot snapshot, ParallelIndexScanDesc target);
+extern IndexScanDesc index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
+ int norderbys, ParallelIndexScanDesc pscan);
+
extern IndexScanDesc index_beginscan(Relation heapRelation,
Relation indexRelation,
Snapshot snapshot,
- int nkeys, int norderbys);
+ ParallelIndexScanDesc pscan,
+ int nkeys, int norderbys, bool temp_snap);
extern IndexScanDesc index_beginscan_bitmap(Relation indexRelation,
Snapshot snapshot,
int nkeys);
extern void index_rescan(IndexScanDesc scan,
ScanKey keys, int nkeys,
ScanKey orderbys, int norderbys);
+extern void index_parallelrescan(IndexScanDesc scan);
extern void index_endscan(IndexScanDesc scan);
extern void index_markpos(IndexScanDesc scan);
extern void index_restrpos(IndexScanDesc scan);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index c580f51..1c8aa8a 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -609,6 +609,8 @@ typedef struct BTScanOpaqueData
ScanKey arrayKeyData; /* modified copy of scan->keyData */
int numArrayKeys; /* number of equality-type array keys (-1 if
* there are any unsatisfiable array keys) */
+ int arrayKeyCount; /* count indicating number of array scan keys
+ * processed */
BTArrayKeyInfo *arrayKeys; /* info about each equality-type array key */
MemoryContext arrayContext; /* scan-lifespan context for array data */
@@ -652,7 +654,25 @@ typedef BTScanOpaqueData *BTScanOpaque;
#define SK_BT_NULLS_FIRST (INDOPTION_NULLS_FIRST << SK_BT_INDOPTION_SHIFT)
/*
- * prototypes for functions in nbtree.c (external entry points for btree)
+ * Below flags are used to indicate the state of parallel scan.
+ *
+ * BTPARALLEL_NOT_INITIALIZED implies that the scan is not started
+ *
+ * BTPARALLEL_ADVANCING implies one of the worker or backend is advancing the
+ * scan to a new page; others must wait.
+ *
+ * BTPARALLEL_IDLE implies that no backend is advancing the scan; someone can
+ * start doing it
+ *
+ * BTPARALLEL_DONE implies that the scan is complete (including error exit)
+ */
+#define BTPARALLEL_NOT_INITIALIZED 0x01
+#define BTPARALLEL_ADVANCING 0x02
+#define BTPARALLEL_DONE 0x03
+#define BTPARALLEL_IDLE 0x04
+/*
+ * prototypes for functions in nbtree.c (external entry points for btree and
+ * functions to maintain state of parallel scan)
*/
extern Datum bthandler(PG_FUNCTION_ARGS);
extern IndexBuildResult *btbuild(Relation heap, Relation index,
@@ -662,10 +682,17 @@ extern bool btinsert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique);
extern IndexScanDesc btbeginscan(Relation rel, int nkeys, int norderbys);
+extern Size btestimateparallelscan(Size size);
+extern void btinitparallelscan(void *target);
+extern BlockNumber _bt_parallel_seize(IndexScanDesc scan, bool *status);
+extern void _bt_parallel_release(IndexScanDesc scan, BlockNumber next_page);
+extern void _bt_parallel_done(IndexScanDesc scan);
+extern void _bt_parallel_advance_scan(IndexScanDesc scan);
extern bool btgettuple(IndexScanDesc scan, ScanDirection dir);
extern int64 btgetbitmap(IndexScanDesc scan, TIDBitmap *tbm);
extern void btrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
ScanKey orderbys, int norderbys);
+extern void btparallelrescan(IndexScanDesc scan);
extern void btendscan(IndexScanDesc scan);
extern void btmarkpos(IndexScanDesc scan);
extern void btrestrpos(IndexScanDesc scan);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index de98dd6..ca843f9 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -21,6 +21,9 @@
#include "access/tupdesc.h"
#include "storage/spin.h"
+#define OffsetToPointer(base, offset)\
+((void *)((char *)base + offset))
+
/*
* Shared state for parallel heap scan.
*
@@ -126,8 +129,20 @@ typedef struct IndexScanDescData
/* state data for traversing HOT chains in index_getnext */
bool xs_continue_hot; /* T if must keep walking HOT chain */
+ bool xs_temp_snap; /* unregister snapshot at scan end? */
+ ParallelIndexScanDesc parallel_scan; /* parallel index scan
+ * information */
} IndexScanDescData;
+/* Generic structure for parallel scans */
+typedef struct ParallelIndexScanDescData
+{
+ Oid ps_relid;
+ Oid ps_indexid;
+ Size ps_offset; /* Offset in bytes of am specific structure */
+ char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelIndexScanDescData;
+
/* Struct for heap-or-index scans of system tables */
typedef struct SysScanDescData
{
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 282f8ae..f038f64 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -784,6 +784,7 @@ typedef enum
WAIT_EVENT_MQ_RECEIVE,
WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH,
+ WAIT_EVENT_PARALLEL_INDEX_SCAN,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP
} WaitEventIPC;
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 993880d..0192810 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -161,6 +161,7 @@ BTPageOpaque
BTPageOpaqueData
BTPageStat
BTPageState
+BTParallelScanDesc
BTScanOpaque
BTScanOpaqueData
BTScanPos
@@ -1264,6 +1265,8 @@ OverrideSearchPath
OverrideStackEntry
PACE_HEADER
PACL
+ParallelIndexScanDesc
+ParallelIndexScanDescData
PATH
PBOOL
PCtxtHandle