diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index bcf9871..096166e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -63,6 +63,7 @@ #include "storage/predicate.h" #include "storage/procarray.h" #include "storage/smgr.h" +#include "storage/spin.h" #include "storage/standby.h" #include "utils/datum.h" #include "utils/inval.h" @@ -80,12 +81,16 @@ bool synchronize_seqscans = true; static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, bool is_bitmapscan, bool is_samplescan, bool temp_snap); +static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan, + bool *pscan_finished); +static void heap_parallelscan_initialize_startblock(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -226,7 +231,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * results for a non-MVCC snapshot, the caller must hold some higher-level * lock that ensures the interesting tuple(s) won't change.) */ - scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + if (scan->rs_parallel != NULL) + scan->rs_nblocks = scan->rs_parallel->phs_nblocks; + else + scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); /* * If the table is large relative to NBuffers, use a bulk-read access @@ -272,7 +280,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) else if (allow_sync && synchronize_seqscans) { scan->rs_syncscan = true; - scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks); + if (scan->rs_parallel != NULL) + heap_parallelscan_initialize_startblock(scan); + else + scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks); } else { @@ -496,7 +507,32 @@ heapgettup(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + bool pscan_finished; + + page = heap_parallelscan_nextpage(scan, &pscan_finished); + + /* + * Return NULL if the scan is finished. It can so happen that + * by the time one of workers started the scan, others have + * already completed scanning the relation, so this worker won't + * need to perform scan. Report scan location before finishing the + * scan so that the final state of the position hint is back at the + * start of the rel. + */ + if (pscan_finished) + { + if (scan->rs_syncscan) + ss_report_location(scan->rs_rd, page); + + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; @@ -519,6 +555,9 @@ heapgettup(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -671,11 +710,22 @@ heapgettup(HeapScanDesc scan, } else { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + if (scan->rs_parallel != NULL) + { + bool pscan_finished = false; + + page = heap_parallelscan_nextpage(scan, &pscan_finished); + finished = pscan_finished; + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + } /* * Report our new scan position for synchronization purposes. We @@ -773,7 +823,32 @@ heapgettup_pagemode(HeapScanDesc scan, tuple->t_data = NULL; return; } - page = scan->rs_startblock; /* first page */ + if (scan->rs_parallel != NULL) + { + bool pscan_finished; + + page = heap_parallelscan_nextpage(scan, &pscan_finished); + + /* + * Return NULL if the scan is finished. It can so happen that + * by the time one of workers started the scan, others have + * already completed scanning the relation, so this worker won't + * need to perform scan. Report scan location before finishing the + * scan so that the final state of the position hint is back at the + * start of the rel. + */ + if (pscan_finished) + { + if (scan->rs_syncscan) + ss_report_location(scan->rs_rd, page); + + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + page = scan->rs_startblock; /* first page */ heapgetpage(scan, page); lineindex = 0; scan->rs_inited = true; @@ -793,6 +868,9 @@ heapgettup_pagemode(HeapScanDesc scan, } else if (backward) { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + if (!scan->rs_inited) { /* @@ -934,11 +1012,22 @@ heapgettup_pagemode(HeapScanDesc scan, } else { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + if (scan->rs_parallel != NULL) + { + bool pscan_finished = false; + + page = heap_parallelscan_nextpage(scan, &pscan_finished); + finished = pscan_finished; + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks <= 0 : false); + } /* * Report our new scan position for synchronization purposes. We @@ -1341,7 +1430,7 @@ HeapScanDesc heap_beginscan(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, false); } @@ -1351,7 +1440,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) Oid relid = RelationGetRelid(relation); Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, true, true, true, false, false, true); } @@ -1360,7 +1449,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, true, false, false, false); } @@ -1369,7 +1458,7 @@ HeapScanDesc heap_beginscan_bm(Relation relation, Snapshot snapshot, int nkeys, ScanKey key) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, false, false, true, true, false, false); } @@ -1378,7 +1467,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode) { - return heap_beginscan_internal(relation, snapshot, nkeys, key, + return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, allow_strat, allow_sync, allow_pagemode, false, true, false); } @@ -1386,6 +1475,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot, static HeapScanDesc heap_beginscan_internal(Relation relation, Snapshot snapshot, int nkeys, ScanKey key, + ParallelHeapScanDesc parallel_scan, bool allow_strat, bool allow_sync, bool allow_pagemode, @@ -1418,6 +1508,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, scan->rs_allow_strat = allow_strat; scan->rs_allow_sync = allow_sync; scan->rs_temp_snap = temp_snap; + scan->rs_parallel = parallel_scan; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot @@ -1532,6 +1623,159 @@ heap_endscan(HeapScanDesc scan) } /* ---------------- + * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc + * + * Sadly, this doesn't reduce to a constant, because the size required + * to serialize the snapshot can vary. + * ---------------- + */ +Size +heap_parallelscan_estimate(Snapshot snapshot) +{ + return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data), + EstimateSnapshotSpace(snapshot)); +} + +/* ---------------- + * heap_parallelscan_initialize - initialize ParallelHeapScanDesc + * + * Must allow as many bytes of shared memory as returned by + * heap_parallelscan_estimate. Call this just once in the leader + * process; then, individual workers attach via heap_beginscan_parallel. + * ---------------- + */ +void +heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, + Snapshot snapshot) +{ + target->phs_relid = RelationGetRelid(relation); + target->phs_nblocks = RelationGetNumberOfBlocks(relation); + SpinLockInit(&target->phs_mutex); + target->phs_cblock = InvalidBlockNumber; + target->phs_firstpass = true; + SerializeSnapshot(snapshot, target->phs_snapshot_data); +} + +/* ---------------- + * heap_parallelscan_initialize_startblock - initialize the startblock for + * parallel scan. + * + * Only the first worker of parallel scan will initialize the start + * block for scan and others will use that information to indicate + * the end of scan. + * ---------------- + */ +static void +heap_parallelscan_initialize_startblock(HeapScanDesc scan) +{ + ParallelHeapScanDesc parallel_scan; + + Assert(scan->rs_parallel); + + parallel_scan = scan->rs_parallel; + + /* + * InvalidBlockNumber indicates that this initialization is done for + * first worker. + */ + SpinLockAcquire(¶llel_scan->phs_mutex); + if (parallel_scan->phs_cblock == InvalidBlockNumber) + { + scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks); + parallel_scan->phs_cblock = scan->rs_startblock; + parallel_scan->phs_startblock = scan->rs_startblock; + } + else + scan->rs_startblock = parallel_scan->phs_startblock; + SpinLockRelease(¶llel_scan->phs_mutex); +} + +/* ---------------- + * heap_parallelscan_nextpage - get the next page to scan + * + * Scanning till the position from where the parallel scan has started + * indicates end of scan. Note, however, that other backends could still + * be scanning if they grabbed a page to scan and aren't done with it yet. + * Resets the current position for parallel scan to the begining of + * relation, if next page to scan is greater than total number of pages in + * relation. + * ---------------- + */ +static BlockNumber +heap_parallelscan_nextpage(HeapScanDesc scan, + bool *pscan_finished) +{ + BlockNumber page = InvalidBlockNumber; + ParallelHeapScanDesc parallel_scan; + + Assert(scan->rs_parallel); + + parallel_scan = scan->rs_parallel; + + *pscan_finished = false; + + /* we treat InvalidBlockNumber specially here to avoid overflow */ + SpinLockAcquire(¶llel_scan->phs_mutex); + if (parallel_scan->phs_cblock != InvalidBlockNumber) + page = parallel_scan->phs_cblock++; + + if (page >= scan->rs_nblocks) + { + parallel_scan->phs_cblock = 0; + page = parallel_scan->phs_cblock++; + } + + /* + * scan position will be same as start position once during start + * of scan and then at end of scan. + */ + if (parallel_scan->phs_firstpass && page == parallel_scan->phs_startblock) + parallel_scan->phs_firstpass = false; + else if (!parallel_scan->phs_firstpass && page == parallel_scan->phs_startblock) + { + *pscan_finished = true; + parallel_scan->phs_cblock--; + } + SpinLockRelease(¶llel_scan->phs_mutex); + + return page; +} + +/* ---------------- + * heap_beginscan_parallel - join a parallel scan + * + * Caller must hold a suitable lock on the correct relation. + * ---------------- + */ +HeapScanDesc +heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) +{ + Snapshot snapshot; + + Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); + snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); + RegisterSnapshot(snapshot); + + return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, + true, true, true, false, false, true); +} + +/* ---------------- + * heap_parallel_rescan - restart a parallel relation scan + * ---------------- + */ +void +heap_parallel_rescan(ParallelHeapScanDesc pscan, + HeapScanDesc scan) +{ + if (pscan != NULL) + scan->rs_parallel = pscan; + + heap_rescan(scan, /* scan desc */ + NULL); /* new scan keys */ +} + +/* ---------------- * heap_getnext - retrieve next tuple in scan * * Fix to work with index relations. diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index e99ad56..9fe5d91 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -731,6 +731,7 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) { case T_SeqScan: case T_SampleScan: + case T_PartialSeqScan: case T_Funnel: case T_IndexScan: case T_IndexOnlyScan: @@ -855,6 +856,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_SampleScan: pname = sname = "Sample Scan"; break; + case T_PartialSeqScan: + pname = sname = "Partial Seq Scan"; + break; case T_Funnel: pname = sname = "Funnel"; break; @@ -1008,6 +1012,7 @@ ExplainNode(PlanState *planstate, List *ancestors, { case T_SeqScan: case T_SampleScan: + case T_PartialSeqScan: case T_Funnel: case T_BitmapHeapScan: case T_TidScan: @@ -1282,6 +1287,7 @@ ExplainNode(PlanState *planstate, List *ancestors, planstate, ancestors, es); /* FALL THRU to print additional fields the same as SeqScan */ case T_SeqScan: + case T_PartialSeqScan: case T_ValuesScan: case T_CteScan: case T_WorkTableScan: @@ -2358,6 +2364,7 @@ ExplainTargetRel(Plan *plan, Index rti, ExplainState *es) { case T_SeqScan: case T_SampleScan: + case T_PartialSeqScan: case T_Funnel: case T_IndexScan: case T_IndexOnlyScan: diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index fb34864..9ccb40e 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -20,8 +20,8 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ nodeLimit.o nodeLockRows.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ - nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \ - nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ + nodeNestloop.o nodeFunctionscan.o nodePartialSeqscan.o nodeRecursiveunion.o \ + nodeResult.o nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \ nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \ nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o tqueue.o spi.o diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 4915151..dc45c20 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -38,6 +38,7 @@ #include "executor/nodeMergejoin.h" #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" +#include "executor/nodePartialSeqscan.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" #include "executor/nodeSamplescan.h" @@ -161,6 +162,10 @@ ExecReScan(PlanState *node) ExecReScanSampleScan((SampleScanState *) node); break; + case T_PartialSeqScanState: + ExecReScanPartialSeqScan((PartialSeqScanState *) node); + break; + case T_FunnelState: ExecReScanFunnel((FunnelState *) node); break; @@ -473,6 +478,7 @@ ExecSupportsBackwardScan(Plan *node) return false; case T_Funnel: + case T_PartialSeqScan: return false; case T_IndexScan: diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c index 650fcc5..7a44462 100644 --- a/src/backend/executor/execCurrent.c +++ b/src/backend/executor/execCurrent.c @@ -262,6 +262,7 @@ search_plan_tree(PlanState *node, Oid table_oid) */ case T_SeqScanState: case T_SampleScanState: + case T_PartialSeqScanState: case T_FunnelState: case T_IndexScanState: case T_IndexOnlyScanState: diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 726bc7f..e24478a 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -17,6 +17,7 @@ #include "executor/execParallel.h" #include "executor/nodeFunnel.h" +#include "executor/nodePartialSeqscan.h" #include "nodes/nodeFuncs.h" #include "optimizer/planmain.h" #include "optimizer/planner.h" @@ -297,6 +298,18 @@ ExecParallelEstimate(Node *node, parallel_estimate_ctx *pestcontext) switch (nodeTag(node)) { + case T_PartialSeqScanState: + { + EState *estate = ((PartialSeqScanState *) node)->ss.ps.state; + + *pestcontext->psize = heap_parallelscan_estimate(estate->es_snapshot); + shm_toc_estimate_chunk(&pestcontext->context->estimator, *pestcontext->psize); + + /* key for paratial scan information. */ + shm_toc_estimate_keys(&pestcontext->context->estimator, 1); + return true; + } + break; default: break; } @@ -312,11 +325,30 @@ ExecParallelEstimate(Node *node, parallel_estimate_ctx *pestcontext) static bool ExecParallelInitializeDSM(Node *node, parallel_estimate_ctx *pestcontext) { + ParallelHeapScanDesc pscan; + if (node == NULL) return false; switch (nodeTag(node)) { + case T_PartialSeqScanState: + { + EState *estate = ((PartialSeqScanState *) node)->ss.ps.state; + + /* + * Store parallel heap scan descriptor in dynamic shared + * memory. + */ + pscan = shm_toc_allocate(pestcontext->context->toc, + *pestcontext->psize); + heap_parallelscan_initialize(pscan, + ((PartialSeqScanState *) node)->ss.ss_currentRelation, + estate->es_snapshot); + shm_toc_insert(pestcontext->context->toc, PARALLEL_KEY_SCAN, pscan); + return true; + } + break; default: break; } diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index c181bf2..e24a439 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -100,6 +100,7 @@ #include "executor/nodeMergejoin.h" #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" +#include "executor/nodePartialSeqscan.h" #include "executor/nodeFunnel.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" @@ -197,6 +198,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_PartialSeqScan: + result = (PlanState *) ExecInitPartialSeqScan((PartialSeqScan *) node, + estate, eflags); + break; + case T_Funnel: result = (PlanState *) ExecInitFunnel((Funnel *) node, estate, eflags); @@ -422,6 +428,10 @@ ExecProcNode(PlanState *node) result = ExecSampleScan((SampleScanState *) node); break; + case T_PartialSeqScanState: + result = ExecPartialSeqScan((PartialSeqScanState *) node); + break; + case T_FunnelState: result = ExecFunnel((FunnelState *) node); break; @@ -668,6 +678,10 @@ ExecEndNode(PlanState *node) ExecEndSampleScan((SampleScanState *) node); break; + case T_PartialSeqScanState: + ExecEndPartialSeqScan((PartialSeqScanState *) node); + break; + case T_FunnelState: ExecEndFunnel((FunnelState *) node); break; diff --git a/src/backend/executor/nodePartialSeqscan.c b/src/backend/executor/nodePartialSeqscan.c new file mode 100644 index 0000000..e4a125a --- /dev/null +++ b/src/backend/executor/nodePartialSeqscan.c @@ -0,0 +1,308 @@ +/*------------------------------------------------------------------------- + * + * nodePartialSeqscan.c + * Support routines for partial sequential scans of relations. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodePartialSeqscan.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * ExecPartialSeqScan scans a relation partially. + * PartialSeqNext retrieve next tuple from heap. + * ExecInitPartialSeqScan creates and initializes a partial seqscan node. + * ExecEndPartialSeqScan releases any storage allocated. + */ +#include "postgres.h" + +#include "access/relscan.h" +#include "executor/execdebug.h" +#include "executor/execParallel.h" +#include "executor/nodePartialSeqscan.h" +#include "utils/rel.h" + + + +/* ---------------------------------------------------------------- + * Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * PartialSeqNext + * + * This is a workhorse for ExecPartialSeqScan + * ---------------------------------------------------------------- + */ +static TupleTableSlot * +PartialSeqNext(PartialSeqScanState *node) +{ + HeapTuple tuple; + HeapScanDesc scandesc; + EState *estate; + ScanDirection direction; + TupleTableSlot *slot; + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + slot = node->ss.ss_ScanTupleSlot; + + /* + * get the next tuple from the table + */ + tuple = heap_getnext(scandesc, direction); + + /* + * save the tuple and the buffer returned to us by the access methods in + * our scan tuple slot and return the slot. Note: we pass 'false' because + * tuples returned by heap_getnext() are pointers onto disk pages and were + * not created with palloc() and so should not be pfree()'d. Note also + * that ExecStoreTuple will increment the refcount of the buffer; the + * refcount will not be dropped until the tuple table slot is cleared. + */ + if (tuple) + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + scandesc->rs_cbuf, /* buffer associated with this + * tuple */ + false); /* don't pfree this pointer */ + else + ExecClearTuple(slot); + + return slot; +} + +/* + * PartialSeqRecheck -- access method routine to recheck a tuple in EvalPlanQual + */ +static bool +PartialSeqRecheck(PartialSeqScanState *node, TupleTableSlot *slot) +{ + /* + * Note that unlike IndexScan, PartialSeqScan never use keys in + * heap_beginscan (and this is very bad) - so, here we do not check are + * keys ok or not. + */ + return true; +} + +/* ---------------------------------------------------------------- + * InitPartialScanRelation + * + * Set up to access the scan relation. + * ---------------------------------------------------------------- + */ +static void +InitPartialScanRelation(PartialSeqScanState *node, EState *estate, int eflags) +{ + Relation currentRelation; + shm_toc *toc; + + /* + * get the relation object id from the relid'th entry in the range table, + * open that relation and acquire appropriate lock on it. + */ + currentRelation = ExecOpenScanRelation(estate, + ((Scan *) node->ss.ps.plan)->scanrelid, + eflags); + + /* + * Parallel scan descriptor is initialized and stored in dynamic shared + * memory segment by master backend and parallel workers retrieve it from + * shared memory. We set 'toc' (place to lookup parallel scan descriptor) + * as retrievied by attaching to dsm for parallel workers whereas master + * backend stores it directly in partial scan state node after + * initializing workers. + */ + toc = GetParallelShmToc(); + if (toc) + node->ss.ps.toc = toc; + + node->ss.ss_currentRelation = currentRelation; + + /* and report the scan tuple slot's rowtype */ + ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation)); +} + +/* ---------------------------------------------------------------- + * ExecInitPartialSeqScan + * ---------------------------------------------------------------- + */ +PartialSeqScanState * +ExecInitPartialSeqScan(PartialSeqScan *node, EState *estate, int eflags) +{ + PartialSeqScanState *scanstate; + + /* + * Once upon a time it was possible to have an outerPlan of a SeqScan, but + * not any more. + */ + Assert(outerPlan(node) == NULL); + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + scanstate = makeNode(PartialSeqScanState); + scanstate->ss.ps.plan = (Plan *) node; + scanstate->ss.ps.state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &scanstate->ss.ps); + + /* + * initialize child expressions + */ + scanstate->ss.ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) scanstate); + scanstate->ss.ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) scanstate); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &scanstate->ss.ps); + ExecInitScanTupleSlot(estate, &scanstate->ss); + + /* + * initialize scan relation + */ + InitPartialScanRelation(scanstate, estate, eflags); + + scanstate->ss.ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&scanstate->ss.ps); + ExecAssignScanProjectionInfo(&scanstate->ss); + + return scanstate; +} + +/* ---------------------------------------------------------------- + * ExecPartialSeqScan(node) + * + * Scans the relation and returns the next qualifying tuple. + * We call the ExecScan() routine and pass it the appropriate + * access method functions. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecPartialSeqScan(PartialSeqScanState *node) +{ + /* + * Initialize the scan on first execution, normally we initialize it + * during ExecutorStart phase, however we need ParallelHeapScanDesc to + * initialize the scan in case of this node and the same is initialized by + * the Funnel node during ExecutorRun phase. + */ + if (!node->scan_initialized) + { + ParallelHeapScanDesc pscan; + + /* + * Parallel scan descriptor is initialized and stored in dynamic + * shared memory segment by master backend, parallel workers and local + * scan by master backend retrieve it from shared memory. If the scan + * descriptor is available on first execution, then we need to + * re-initialize for rescan. + */ + Assert(node->ss.ps.toc); + + pscan = shm_toc_lookup(node->ss.ps.toc, PARALLEL_KEY_SCAN); + + if (!node->ss.ss_currentScanDesc) + { + node->ss.ss_currentScanDesc = + heap_beginscan_parallel(node->ss.ss_currentRelation, pscan); + } + else + { + heap_parallel_rescan(pscan, node->ss.ss_currentScanDesc); + } + + node->scan_initialized = true; + } + + return ExecScan((ScanState *) node, + (ExecScanAccessMtd) PartialSeqNext, + (ExecScanRecheckMtd) PartialSeqRecheck); +} + +/* ---------------------------------------------------------------- + * ExecEndPartialSeqScan + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndPartialSeqScan(PartialSeqScanState *node) +{ + Relation relation; + HeapScanDesc scanDesc; + + /* + * get information from node + */ + relation = node->ss.ss_currentRelation; + scanDesc = node->ss.ss_currentScanDesc; + + /* + * Free the exprcontext + */ + ExecFreeExprContext(&node->ss.ps); + + /* + * clean out the tuple table + */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + /* + * close heap scan + */ + if (scanDesc) + heap_endscan(scanDesc); + + /* + * close the heap relation. + */ + ExecCloseScanRelation(relation); +} + +/* ---------------------------------------------------------------- + * Join Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecReScanPartialSeqScan + * + * Rescans the relation. + * ---------------------------------------------------------------- + */ +void +ExecReScanPartialSeqScan(PartialSeqScanState *node) +{ + if (node->scan_initialized) + node->scan_initialized = false; + + ExecScanReScan((ScanState *) node); +} diff --git a/src/backend/executor/nodeResult.c b/src/backend/executor/nodeResult.c index 8d3dde0..b348bfd 100644 --- a/src/backend/executor/nodeResult.c +++ b/src/backend/executor/nodeResult.c @@ -75,6 +75,13 @@ ExecResult(ResultState *node) econtext = node->ps.ps_ExprContext; /* + * Result node can be added as a gating node on top of PartialSeqScan + * node, so need to percolate toc information to outer node. + */ + if (node->ps.toc) + outerPlanState(node)->toc = node->ps.toc; + + /* * check constant qualifications like (2 > 1), if not already done */ if (node->rs_checkqual) diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index ec605bd..e62fd14 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -382,6 +382,22 @@ _copySampleScan(const SampleScan *from) } /* + * _copyPartialSeqScan + */ +static PartialSeqScan * +_copyPartialSeqScan(const SeqScan *from) +{ + PartialSeqScan *newnode = makeNode(PartialSeqScan); + + /* + * copy node superclass fields + */ + CopyScanFields((const Scan *) from, (Scan *) newnode); + + return newnode; +} + +/* * _copyFunnel */ static Funnel * @@ -4261,6 +4277,9 @@ copyObject(const void *from) case T_SampleScan: retval = _copySampleScan(from); break; + case T_PartialSeqScan: + retval = _copyPartialSeqScan(from); + break; case T_Funnel: retval = _copyFunnel(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index bc9b481..8a145d9 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -458,6 +458,14 @@ _outSampleScan(StringInfo str, const SampleScan *node) } static void +_outPartialSeqScan(StringInfo str, const SeqScan *node) +{ + WRITE_NODE_TYPE("PARTIALSEQSCAN"); + + _outScanInfo(str, (const Scan *) node); +} + +static void _outFunnel(StringInfo str, const Funnel *node) { WRITE_NODE_TYPE("FUNNEL"); @@ -3018,6 +3026,9 @@ _outNode(StringInfo str, const void *obj) case T_SampleScan: _outSampleScan(str, obj); break; + case T_PartialSeqScan: + _outPartialSeqScan(str, obj); + break; case T_Funnel: _outFunnel(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index ef88209..95a8503 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1665,6 +1665,19 @@ _readSampleScan(void) } /* + * _readPartialSeqScan + */ +static PartialSeqScan * +_readPartialSeqScan(void) +{ + READ_LOCALS_NO_FIELDS(PartialSeqScan); + + ReadCommonScan(local_node); + + READ_DONE(); +} + +/* * _readIndexScan */ static IndexScan * @@ -2366,6 +2379,8 @@ parseNodeString(void) return_value = _readSeqScan(); else if (MATCH("SAMPLESCAN", 10)) return_value = _readSampleScan(); + else if (MATCH("PARTIALSEQSCAN", 14)) + return_value = _readPartialSeqScan(); else if (MATCH("INDEXSCAN", 9)) return_value = _readIndexScan(); else if (MATCH("INDEXONLYSCAN", 13)) diff --git a/src/backend/optimizer/path/Makefile b/src/backend/optimizer/path/Makefile index 6864a62..6e462b1 100644 --- a/src/backend/optimizer/path/Makefile +++ b/src/backend/optimizer/path/Makefile @@ -13,6 +13,6 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = allpaths.o clausesel.o costsize.o equivclass.o indxpath.o \ - joinpath.o joinrels.o pathkeys.o tidpath.o + joinpath.o joinrels.o pathkeys.o parallelpath.o tidpath.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 8fc1cfd..c2ae95d 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -477,6 +477,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) /* Consider sequential scan */ add_path(rel, create_seqscan_path(root, rel, required_outer)); + /* Consider parallel scans */ + create_parallelscan_paths(root, rel, required_outer); + /* Consider index scans */ create_index_paths(root, rel); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 0a40f6c..5e8cd56 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -296,6 +296,50 @@ cost_samplescan(Path *path, PlannerInfo *root, } /* + * cost_partialseqscan + * Determines and returns the cost of scanning a relation partially. + * + * 'baserel' is the relation to be scanned + * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + * 'nworkers' are the number of workers among which the work will be + * distributed + */ +void +cost_partialseqscan(Path *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, + int nworkers) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + + cost_seqscan(path, root, baserel, param_info); + + startup_cost = path->startup_cost; + + run_cost = path->total_cost - startup_cost; + + /* + * Account for small cost for communication related to scan + * via the ParallelHeapScanDesc. + */ + run_cost += 0.01; + + /* + * Runtime cost will be equally shared by all workers. + * Here assumption is that disk access cost will also be + * equally shared between workers which is generally true + * unless there are too many workers working on a relatively + * lesser number of blocks. If we come across any such case, + * then we can think of changing the current cost model for + * partial sequiantial scan. + */ + run_cost = run_cost / (nworkers + 1); + + path->startup_cost = startup_cost; + path->total_cost = startup_cost + run_cost; +} + +/* * cost_funnel * Determines and returns the cost of funnel path. * diff --git a/src/backend/optimizer/path/parallelpath.c b/src/backend/optimizer/path/parallelpath.c new file mode 100644 index 0000000..a5a25cd --- /dev/null +++ b/src/backend/optimizer/path/parallelpath.c @@ -0,0 +1,90 @@ +/*------------------------------------------------------------------------- + * + * parallelpath.c + * Routines to determine parallel paths for scanning a given relation. + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/optimizer/path/parallelpath.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heapam.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "parser/parsetree.h" +#include "utils/rel.h" + + +/* + * create_parallelscan_paths + * Create paths corresponding to parallel scans of the given rel. + * Currently we only support partial sequential scan. + * + * Candidate paths are added to the rel's pathlist (using add_path). + */ +void +create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel, + Relids required_outer) +{ + int num_parallel_workers = 0; + int estimated_parallel_workers = 0; + Oid reloid; + Relation relation; + Path *subpath; + + /* + * parallel scan is possible only if user has set parallel_seqscan_degree + * to value greater than 0 and the query is parallel-safe. + */ + if (parallel_seqscan_degree <= 0 || !root->glob->parallelModeOK) + return; + + /* + * There should be atleast a thousand pages to scan for each worker. This + * number is somewhat arbitratry, however we don't want to spawn workers + * to scan smaller relations as that will be costly. + */ + estimated_parallel_workers = rel->pages / 1000; + + if (estimated_parallel_workers <= 0) + return; + + reloid = planner_rt_fetch(rel->relid, root)->relid; + + relation = heap_open(reloid, NoLock); + + /* + * Temporary relations can't be scanned by parallel workers as they are + * visible only to local sessions. + */ + if (RelationUsesLocalBuffers(relation)) + { + heap_close(relation, NoLock); + return; + } + + heap_close(relation, NoLock); + + num_parallel_workers = Min(parallel_seqscan_degree, + estimated_parallel_workers); + + /* + * Create the partial scan path which each worker backend needs to + * execute. + */ + subpath = create_partialseqscan_path(root, rel, required_outer, + num_parallel_workers); + + /* Create the funnel path which master backend needs to execute. */ + add_path(rel, (Path *) create_funnel_path(root, rel, subpath, + required_outer, + num_parallel_workers)); +} diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index e987922..67d74db 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -60,6 +60,8 @@ static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); static SampleScan *create_samplescan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +static Scan *create_partialseqscan_plan(PlannerInfo *root, Path *best_path, + List *tlist, List *scan_clauses); static Funnel *create_funnel_plan(PlannerInfo *root, FunnelPath *best_path); static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path, @@ -106,6 +108,8 @@ static void copy_plan_costsize(Plan *dest, Plan *src); static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid); static SampleScan *make_samplescan(List *qptlist, List *qpqual, Index scanrelid, TableSampleClause *tsc); +static PartialSeqScan *make_partialseqscan(List *qptlist, List *qpqual, + Index scanrelid); static Funnel *make_funnel(List *qptlist, List *qpqual, Index scanrelid, int nworkers, Plan *subplan); @@ -239,6 +243,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path) { case T_SeqScan: case T_SampleScan: + case T_PartialSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -365,6 +370,13 @@ create_scan_plan(PlannerInfo *root, Path *best_path) scan_clauses); break; + case T_PartialSeqScan: + plan = (Plan *) create_partialseqscan_plan(root, + best_path, + tlist, + scan_clauses); + break; + case T_IndexScan: plan = (Plan *) create_indexscan_plan(root, (IndexPath *) best_path, @@ -569,6 +581,7 @@ disuse_physical_tlist(PlannerInfo *root, Plan *plan, Path *path) { case T_SeqScan: case T_SampleScan: + case T_PartialSeqScan: case T_Funnel: case T_IndexScan: case T_IndexOnlyScan: @@ -1204,6 +1217,46 @@ create_samplescan_plan(PlannerInfo *root, Path *best_path, } /* + * create_partialseqscan_plan + * + * Returns a partial seqscan plan for the base relation scanned by + * 'best_path' with restriction clauses 'scan_clauses' and targetlist + * 'tlist'. + */ +static Scan * +create_partialseqscan_plan(PlannerInfo *root, Path *best_path, + List *tlist, List *scan_clauses) +{ + Scan *scan_plan; + Index scan_relid = best_path->parent->relid; + + /* it should be a base rel... */ + Assert(scan_relid > 0); + Assert(best_path->parent->rtekind == RTE_RELATION); + + /* Sort clauses into best execution order */ + scan_clauses = order_qual_clauses(root, scan_clauses); + + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ + scan_clauses = extract_actual_clauses(scan_clauses, false); + + /* Replace any outer-relation variables with nestloop params */ + if (best_path->param_info) + { + scan_clauses = (List *) + replace_nestloop_params(root, (Node *) scan_clauses); + } + + scan_plan = (Scan *) make_partialseqscan(tlist, + scan_clauses, + scan_relid); + + copy_path_costsize(&scan_plan->plan, best_path); + + return scan_plan; +} + +/* * create_funnel_plan * * Returns a funnel plan for the base relation scanned by @@ -3532,6 +3585,24 @@ make_samplescan(List *qptlist, return node; } +static PartialSeqScan * +make_partialseqscan(List *qptlist, + List *qpqual, + Index scanrelid) +{ + PartialSeqScan *node = makeNode(PartialSeqScan); + Plan *plan = &node->plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = NULL; + plan->righttree = NULL; + node->scanrelid = scanrelid; + + return node; +} + static Funnel * make_funnel(List *qptlist, List *qpqual, diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 2cff5a9..90b611b 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -442,6 +442,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) switch (nodeTag(plan)) { case T_SeqScan: + case T_PartialSeqScan: { SeqScan *splan = (SeqScan *) plan; @@ -2309,6 +2310,11 @@ fix_node_funcids(Plan *node) switch (nodeTag(node)) { + case T_Result: + fix_opfuncids((Node*) (((Result *)node)->resconstantqual)); + break; + case T_PartialSeqScan: + break; default: elog(ERROR, "unrecognized node type: %d", (int) nodeTag(node)); break; diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 073a7f5..37b5909 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2243,6 +2243,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, context.paramids = bms_add_members(context.paramids, scan_params); break; + case T_PartialSeqScan: case T_Funnel: context.paramids = bms_add_members(context.paramids, scan_params); break; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index c886075..628fb84 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -732,6 +732,28 @@ create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer } /* + * create_partialseqscan_path + * Creates a path corresponding to a partial sequential scan, returning the + * pathnode. + */ +Path * +create_partialseqscan_path(PlannerInfo *root, RelOptInfo *rel, + Relids required_outer, int nworkers) +{ + Path *pathnode = makeNode(Path); + + pathnode->pathtype = T_PartialSeqScan; + pathnode->parent = rel; + pathnode->param_info = get_baserel_parampathinfo(root, rel, + required_outer); + pathnode->pathkeys = NIL; /* partialseqscan has unordered result */ + + cost_partialseqscan(pathnode, root, rel, pathnode->param_info, nworkers); + + return pathnode; +} + +/* * create_funnel_path * * Creates a path corresponding to a funnel scan, returning the diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 75e6b72..ead8411 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation, #define heap_close(r,l) relation_close(r,l) -/* struct definition appears in relscan.h */ +/* struct definitions appear in relscan.h */ typedef struct HeapScanDescData *HeapScanDesc; +typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc; /* * HeapScanIsValid @@ -121,11 +122,17 @@ extern void heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber endBlk); extern void heapgetpage(HeapScanDesc scan, BlockNumber page); extern void heap_rescan(HeapScanDesc scan, ScanKey key); +extern void heap_parallel_rescan(ParallelHeapScanDesc pscan, HeapScanDesc scan); extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode); extern void heap_endscan(HeapScanDesc scan); extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); +extern Size heap_parallelscan_estimate(Snapshot snapshot); +extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, + Relation relation, Snapshot snapshot); +extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc); + extern bool heap_fetch(Relation relation, Snapshot snapshot, HeapTuple tuple, Buffer *userbuf, bool keep_buf, Relation stats_relation); diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 6e62319..f962f83 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -20,6 +20,17 @@ #include "access/itup.h" #include "access/tupdesc.h" +/* Struct for parallel scan setup */ +typedef struct ParallelHeapScanDescData +{ + Oid phs_relid; + BlockNumber phs_nblocks; + slock_t phs_mutex; + BlockNumber phs_cblock; + BlockNumber phs_startblock; + bool phs_firstpass; + char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; +} ParallelHeapScanDescData; typedef struct HeapScanDescData { @@ -49,6 +60,7 @@ typedef struct HeapScanDescData BlockNumber rs_cblock; /* current block # in scan, if any */ Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ + ParallelHeapScanDesc rs_parallel; /* parallel scan information */ /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ diff --git a/src/include/executor/nodePartialSeqscan.h b/src/include/executor/nodePartialSeqscan.h new file mode 100644 index 0000000..cec09ad --- /dev/null +++ b/src/include/executor/nodePartialSeqscan.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * nodePartialSeqscan.h + * prototypes for nodePartialSeqscan.c + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodePartialSeqscan.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEPARTIALSEQSCAN_H +#define NODEPARTIALSEQSCAN_H + +#include "nodes/execnodes.h" + +extern PartialSeqScanState *ExecInitPartialSeqScan(PartialSeqScan *node, + EState *estate, int eflags); +extern TupleTableSlot *ExecPartialSeqScan(PartialSeqScanState *node); +extern void ExecEndPartialSeqScan(PartialSeqScanState *node); +extern void ExecReScanPartialSeqScan(PartialSeqScanState *node); + +#endif /* NODEPARTIALSEQSCAN_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index ee82999..508f69d 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1282,6 +1282,16 @@ typedef struct SampleScanState } SampleScanState; /* + * PartialSeqScanState extends ScanState by storing additional information + * related to scan. + */ +typedef struct PartialSeqScanState +{ + ScanState ss; /* its first field is NodeTag */ + bool scan_initialized; /* used to determine if the scan is initialized */ +} PartialSeqScanState; + +/* * FunnelState extends ScanState by storing additional information * related to parallel workers. * pcxt parallel context for managing generic state information diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 48f7160..039f053 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -52,6 +52,7 @@ typedef enum NodeTag T_Scan, T_SeqScan, T_SampleScan, + T_PartialSeqScan, T_Funnel, T_IndexScan, T_IndexOnlyScan, @@ -100,6 +101,7 @@ typedef enum NodeTag T_ScanState, T_SeqScanState, T_SampleScanState, + T_PartialSeqScanState, T_FunnelState, T_IndexScanState, T_IndexOnlyScanState, diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 69302af..2d25a01 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -296,6 +296,12 @@ typedef struct SampleScan struct TableSampleClause *tablesample; } SampleScan; +/* ---------------- + * partial sequential scan node + * ---------------- + */ +typedef SeqScan PartialSeqScan; + /* ------------ * Funnel node * ------------ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 8ca7d0f..4ecafd4 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -80,6 +80,9 @@ extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); +extern void cost_partialseqscan(Path *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, + int nworkers); extern void cost_funnel(FunnelPath *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_index(IndexPath *path, PlannerInfo *root, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 29b344f..8c8a1a2 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -34,6 +34,8 @@ extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); extern Path *create_samplescan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); +extern Path *create_partialseqscan_path(PlannerInfo *root, RelOptInfo *rel, + Relids required_outer, int nworkers); extern FunnelPath *create_funnel_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, Relids required_outer, int nworkers); diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 87123a5..e7db9ab 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -55,6 +55,14 @@ extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel); #endif /* + * parallelpath.c + * routines to generate parallel scan paths + */ + +extern void create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel, + Relids required_outer); + +/* * indxpath.c * routines to generate index paths */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ad2a33f..f06c7c2 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1200,6 +1200,8 @@ PACE_HEADER PACL parallel_estimate_ctx ParallelStmt +PartialSeqScan +PartialSeqScanState PATH PBOOL PCtxtHandle