From 9543495bae9d486a81c15087b97bc1d4a1c631df Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 10 Mar 2017 21:52:01 +0300 Subject: [PATCH 4/8] Reversed SeqScan implementation. Main job is done by heappushtups func which iterates over tuples and pushes each. It is mostly copied heapgettup_pagemode, which is left for compatibility. Each tuple handling (checking quals, etc) is implemented as inline functions in nodeSeqscan.h. Since now heapam.h must now about PlanState, some forward decls were added, kind of ugly. EvalPlanQual is not supported. --- src/backend/access/heap/heapam.c | 255 ++++++++++++++++++++++++++++++++++++ src/backend/executor/execProcnode.c | 23 +++- src/backend/executor/nodeSeqscan.c | 75 +++-------- src/include/access/heapam.h | 8 ++ src/include/executor/nodeSeqscan.h | 121 ++++++++++++++++- 5 files changed, 424 insertions(+), 58 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index b147f6482c..b5ce8aff10 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -73,6 +73,8 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" +#include "executor/executor.h" +#include "executor/nodeSeqscan.h" /* GUC variable */ @@ -9236,3 +9238,256 @@ heap_mask(char *pagedata, BlockNumber blkno) } } } + +/* ---------------- + * Fetch tuples, check quals and push them. Modified heapgettup_pagemode, + * a lot of copy-pasting. + * This function in fact doesn't care about pusher type and func, + * although SeqScanState and inlined SeqPushHeapTuple is hardcoded for now + * ---------------- + */ +void +heappushtups(HeapScanDesc scan, + ScanDirection dir, + int nkeys, + ScanKey key, + SeqScanState *pusher) +{ + HeapTuple tuple = &(scan->rs_ctup); + bool backward = ScanDirectionIsBackward(dir); + BlockNumber page; + bool finished; + Page dp; + int lines; + int lineindex; + OffsetNumber lineoff; + int linesleft; + ItemId lpp; + + /* no movement is not supported for now */ + Assert(!ScanDirectionIsNoMovement(dir)); + + /* + * calculate next starting lineindex, given scan direction + */ + if (ScanDirectionIsForward(dir)) + { + if (!scan->rs_inited) + { + /* + * return null immediately if relation is empty + */ + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + SeqPushNull(pusher); + return; + } + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + SeqPushNull(pusher); + return; + } + } + else + page = scan->rs_startblock; /* first page */ + heapgetpage(scan, page); + lineindex = 0; + scan->rs_inited = true; + } + else + { + /* continue from previously returned page/tuple */ + page = scan->rs_cblock; /* current page */ + lineindex = scan->rs_cindex + 1; + } + + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + lines = scan->rs_ntuples; + /* page and lineindex now reference the next visible tid */ + + linesleft = lines - lineindex; + } + else /* backward */ + { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + + if (!scan->rs_inited) + { + /* + * return null immediately if relation is empty + */ + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + SeqPushNull(pusher); + return; + } + + /* + * Disable reporting to syncscan logic in a backwards scan; it's + * not very likely anyone else is doing the same thing at the same + * time, and much more likely that we'll just bollix things for + * forward scanners. + */ + scan->rs_syncscan = false; + /* start from last page of the scan */ + if (scan->rs_startblock > 0) + page = scan->rs_startblock - 1; + else + page = scan->rs_nblocks - 1; + heapgetpage(scan, page); + } + else + { + /* continue from previously returned page/tuple */ + page = scan->rs_cblock; /* current page */ + } + + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + lines = scan->rs_ntuples; + + if (!scan->rs_inited) + { + lineindex = lines - 1; + scan->rs_inited = true; + } + else + { + lineindex = scan->rs_cindex - 1; + } + /* page and lineindex now reference the previous visible tid */ + + linesleft = lineindex + 1; + } + + /* + * advance the scan until we find a qualifying tuple or run out of stuff + * to scan + */ + for (;;) + { + while (linesleft > 0) + { + bool tuple_qualifies = false; + + lineoff = scan->rs_vistuples[lineindex]; + lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); + + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), page, lineoff); + + /* + * if current tuple qualifies, push it. + */ + if (key != NULL) + { + HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + nkeys, key, tuple_qualifies); + } + else + { + tuple_qualifies = true; + } + + if (tuple_qualifies) + { + /* Push tuple */ + scan->rs_cindex = lineindex; + pgstat_count_heap_getnext(scan->rs_rd); + if (!SeqPushHeapTuple(tuple, pusher)) + return; + } + + /* + * and carry on to the next one anyway + */ + --linesleft; + if (backward) + --lineindex; + else + ++lineindex; + } + + /* + * if we get here, it means we've exhausted the items on this page and + * it's time to move to the next. + */ + if (backward) + { + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + if (page == 0) + page = scan->rs_nblocks; + page--; + } + else if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + + /* + * Report our new scan position for synchronization purposes. We + * don't do that when moving backwards, however. That would just + * mess up any other forward-moving scanners. + * + * Note: we do this before checking for end of scan so that the + * final state of the position hint is back at the start of the + * rel. That's not strictly necessary, but otherwise when you run + * the same query multiple times the starting position would shift + * a little bit backwards on every invocation, which is confusing. + * We don't guarantee any specific ordering in general, though. + */ + if (scan->rs_syncscan) + ss_report_location(scan->rs_rd, page); + } + + /* + * return NULL if we've exhausted all the pages + */ + if (finished) + { + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; + SeqPushNull(pusher); + return; + } + + heapgetpage(scan, page); + + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + lines = scan->rs_ntuples; + linesleft = lines; + if (backward) + lineindex = lines - 1; + else + lineindex = 0; + } +} diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 5955f84d86..1b81e30cd3 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -174,6 +174,13 @@ ExecInitNode(Plan *node, EState *estate, int eflags, PlanState *parent) switch (nodeTag(node)) { + /* + * scan nodes + */ + case T_SeqScan: + result = (PlanState *) ExecInitSeqScan((SeqScan *) node, + estate, eflags, parent); + break; default: elog(ERROR, "unrecognized/unsupported node type: %d", (int) nodeTag(node)); @@ -208,6 +215,10 @@ ExecLeaf(PlanState *node) switch (nodeTag(node)) { + case T_SeqScanState: + ExecSeqScan((SeqScanState *) node); + break; + default: elog(ERROR, "bottom node type not supported: %d", (int) nodeTag(node)); @@ -250,7 +261,7 @@ ExecPushTuple(TupleTableSlot *slot, PlanState *pusher) } /* - * Signal the parent that we are done. Like in ExecPushTuple, sender is param + * Signal parent that we are done. Like in ExecPushTuple, sender is param * here because we need to distinguish inner and outer pushes. * * 'slot' must be null tuple. It exists to be able to transfer correct @@ -271,7 +282,8 @@ ExecPushNull(TupleTableSlot *slot, PlanState *pusher) */ if (receiver == NULL) { - SendReadyTuple(NULL, pusher); + SendReadyTuple(slot, pusher); + return; } elog(ERROR, "node type not supported: %d", (int) nodeTag(receiver)); @@ -316,6 +328,13 @@ ExecEndNode(PlanState *node) switch (nodeTag(node)) { + /* + * scan nodes + */ + case T_SeqScanState: + ExecEndSeqScan((SeqScanState *) node); + break; + default: elog(ERROR, "unrecognized/unsupported node type: %d", (int) nodeTag(node)); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index e61895de0a..babd8f07b1 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -15,7 +15,7 @@ /* * INTERFACE ROUTINES * ExecSeqScan sequentially scans a relation. - * ExecSeqNext retrieve next tuple in sequential order. + * pushTupleToSeqScan pushes all tuples to parent node * ExecInitSeqScan creates and initializes a seqscan node. * ExecEndSeqScan releases any storage allocated. * ExecReScanSeqScan rescans the relation @@ -30,29 +30,25 @@ #include "executor/execdebug.h" #include "executor/nodeSeqscan.h" #include "utils/rel.h" +#include "access/heapam.h" static void InitScanRelation(SeqScanState *node, EState *estate, int eflags); -static TupleTableSlot *SeqNext(SeqScanState *node); /* ---------------------------------------------------------------- * Scan Support * ---------------------------------------------------------------- */ -/* ---------------------------------------------------------------- - * SeqNext - * - * This is a workhorse for ExecSeqScan - * ---------------------------------------------------------------- +/* + * Push scanned tuples to the parent. Stop when all tuples are pushed or + * the parent told us to stop pushing. */ -static TupleTableSlot * -SeqNext(SeqScanState *node) +void +ExecSeqScan(SeqScanState *node) { - HeapTuple tuple; - HeapScanDesc scandesc; EState *estate; + HeapScanDesc scandesc; ScanDirection direction; - TupleTableSlot *slot; /* * get information from the estate and scan state @@ -60,8 +56,11 @@ SeqNext(SeqScanState *node) scandesc = node->ss.ss_currentScanDesc; estate = node->ss.ps.state; direction = estate->es_direction; - slot = node->ss.ss_ScanTupleSlot; + /* ExecScanFetch not implemented */ + Assert(estate->es_epqTuple == NULL); + + /* create scandesc, part of old SeqNext before heap_getnext */ if (scandesc == NULL) { /* @@ -73,30 +72,15 @@ SeqNext(SeqScanState *node) 0, NULL); node->ss.ss_currentScanDesc = scandesc; } + Assert(scandesc); - /* - * get the next tuple from the table - */ - tuple = heap_getnext(scandesc, direction); + /* not-page-at-time not supported for now */ + Assert(scandesc->rs_pageatatime); + heappushtups(scandesc, direction, + scandesc->rs_nkeys, + scandesc->rs_key, + node); - /* - * save the tuple and the buffer returned to us by the access methods in - * our scan tuple slot and return the slot. Note: we pass 'false' because - * tuples returned by heap_getnext() are pointers onto disk pages and were - * not created with palloc() and so should not be pfree()'d. Note also - * that ExecStoreTuple will increment the refcount of the buffer; the - * refcount will not be dropped until the tuple table slot is cleared. - */ - if (tuple) - ExecStoreTuple(tuple, /* tuple to store */ - slot, /* slot to store in */ - scandesc->rs_cbuf, /* buffer associated with this - * tuple */ - false); /* don't pfree this pointer */ - else - ExecClearTuple(slot); - - return slot; } /* @@ -113,23 +97,6 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot) } /* ---------------------------------------------------------------- - * ExecSeqScan(node) - * - * Scans the relation sequentially and returns the next qualifying - * tuple. - * We call the ExecScan() routine and pass it the appropriate - * access method functions. - * ---------------------------------------------------------------- - */ -TupleTableSlot * -ExecSeqScan(SeqScanState *node) -{ - return ExecScan((ScanState *) node, - (ExecScanAccessMtd) SeqNext, - (ExecScanRecheckMtd) SeqRecheck); -} - -/* ---------------------------------------------------------------- * InitScanRelation * * Set up to access the scan relation. @@ -154,13 +121,12 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags) ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation)); } - /* ---------------------------------------------------------------- * ExecInitSeqScan * ---------------------------------------------------------------- */ SeqScanState * -ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) +ExecInitSeqScan(SeqScan *node, EState *estate, int eflags, PlanState *parent) { SeqScanState *scanstate; @@ -177,6 +143,7 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) scanstate = makeNode(SeqScanState); scanstate->ss.ps.plan = (Plan *) node; scanstate->ss.ps.state = estate; + scanstate->ss.ps.parent = parent; /* * Miscellaneous initialization diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 7e85510d2f..74097ffe50 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -126,6 +126,14 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode); extern void heap_endscan(HeapScanDesc scan); extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); +/* forward decls because now we need to know about PlanState */ +typedef struct PlanState PlanState; +typedef struct SeqScanState SeqScanState; +extern void heappushtups(HeapScanDesc scan, + ScanDirection dir, + int nkeys, + ScanKey key, + SeqScanState *pusher); extern Size heap_parallelscan_estimate(Snapshot snapshot); extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index 92b305e138..f7d69296a9 100644 --- a/src/include/executor/nodeSeqscan.h +++ b/src/include/executor/nodeSeqscan.h @@ -15,10 +15,15 @@ #define NODESEQSCAN_H #include "access/parallel.h" +#include "access/relscan.h" #include "nodes/execnodes.h" +#include "executor/executor.h" +#include "utils/memutils.h" +#include "miscadmin.h" -extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags); -extern TupleTableSlot *ExecSeqScan(SeqScanState *node); +extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags, + PlanState *parent); +extern void ExecSeqScan(SeqScanState *node); extern void ExecEndSeqScan(SeqScanState *node); extern void ExecReScanSeqScan(SeqScanState *node); @@ -27,4 +32,116 @@ extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); +/* inline functions decls and implementations */ +static inline void SeqPushNull(SeqScanState *pusher); +static inline bool SeqPushHeapTuple(HeapTuple tuple, SeqScanState *pusher); + +/* push NULL to the parent, signaling that we are done */ +static inline void +SeqPushNull(SeqScanState *pusher) +{ + ProjectionInfo *projInfo; + TupleTableSlot *slot; + + projInfo = pusher->ss.ps.ps_ProjInfo; + slot = pusher->ss.ss_ScanTupleSlot; + + ExecClearTuple(slot); + /* + * being careful to use the projection result slot so it has correct + * tupleDesc. + */ + if (projInfo) + ExecPushNull(ExecClearTuple(projInfo->pi_slot), (PlanState *) pusher); + else + ExecPushNull(slot, (PlanState *) pusher); +} + +/* Push ready HeapTuple from SeqScanState + * + * Check qual for the tuple and push it. Tuple must be not NULL. + * Returns true, if parent accepts more tuples, false otherwise + */ +static inline bool SeqPushHeapTuple(HeapTuple tuple, SeqScanState *pusher) +{ + HeapScanDesc scandesc; + ExprContext *econtext; + List *qual; + ProjectionInfo *projInfo; + TupleTableSlot *slot; + + Assert(tuple->t_data != NULL); + + /* + * Fetch data from node + */ + qual = pusher->ss.ps.qual; + projInfo = pusher->ss.ps.ps_ProjInfo; + econtext = pusher->ss.ps.ps_ExprContext; + scandesc = pusher->ss.ss_currentScanDesc; + slot = pusher->ss.ss_ScanTupleSlot; + + CHECK_FOR_INTERRUPTS(); + + /* + * save the tuple and the buffer returned to us by the access methods in + * our scan tuple slot. Note: we pass 'false' because tuples returned by + * heap_getnext() are pointers onto disk pages and were not created with + * palloc() and so should not be pfree()'d. Note also that ExecStoreTuple + * will increment the refcount of the buffer; the refcount will not be + * dropped until the tuple table slot is cleared. + */ + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + scandesc->rs_cbuf, /* buffer associated with this + * tuple */ + false); /* don't pfree this pointer */ + + /* + * If we have neither a qual to check nor a projection to do, just skip + * all the overhead and push the raw scan tuple. + */ + if (!qual && !projInfo) + { + return ExecPushTuple(slot, (PlanState *) pusher); + } + + ResetExprContext(econtext); + /* + * place the current tuple into the expr context + */ + econtext->ecxt_scantuple = slot; + + /* + * check that the current tuple satisfies the qual-clause + * + * check for non-nil qual here to avoid a function call to ExecQual() + * when the qual is nil ... saves only a few cycles, but they add up + * ... + */ + if (!qual || ExecQual(qual, econtext, false)) + { + /* + * Found a satisfactory scan tuple. + */ + if (projInfo) + { + /* + * Form a projection tuple, store it in the result tuple slot + * and push + */ + return ExecPushTuple(ExecProject(projInfo), (PlanState *) pusher); + } + /* + * Here, we aren't projecting, so just push scan tuple. + */ + return ExecPushTuple(slot, (PlanState *) pusher); + } + else + InstrCountFiltered1(pusher, 1); + + return true; +} + + #endif /* NODESEQSCAN_H */ -- 2.11.0