From d99cead6b8eb28fe238d918aa0333389413aab77 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 10 Mar 2017 21:52:01 +0300 Subject: [PATCH 4/8] Reversed SeqScan implementation. Main job is done by heappushtups func which iterates over tuples and pushes each. It is mostly copied heapgettup_pagemode, which is left for compatibility. Each tuple handling (checking quals, etc) is implemented as inline functions in nodeSeqscan.h. Since now heapam.h must now about PlanState, some forward decls were added, kind of ugly. EvalPlanQual is not supported. --- src/backend/access/heap/heapam.c | 256 ++++++++++++++++++++++++++++++++++++ src/backend/executor/execProcnode.c | 17 +++ src/backend/executor/nodeSeqscan.c | 76 ++++------- src/include/access/heapam.h | 9 ++ src/include/executor/nodeSeqscan.h | 149 ++++++++++++++++++++- 5 files changed, 451 insertions(+), 56 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 85261379b1..0e6eafd44f 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -73,6 +73,8 @@ #include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/tqual.h" +#include "executor/executor.h" +#include "executor/nodeSeqscan.h" /* GUC variable */ @@ -9236,3 +9238,257 @@ heap_mask(char *pagedata, BlockNumber blkno) } } } + +/* ---------------- + * Fetch tuples, check quals and push them. Modified heapgettup_pagemode, + * a lot of copy-pasting. + * This function in fact doesn't care about pusher type and func, + * although SeqScanState and inlined SeqPushHeapTuple is hardcoded for now + * ---------------- + */ +void +heappushtups(HeapScanDesc scan, + ScanDirection dir, + int nkeys, + ScanKey key, + PlanState *node, + SeqScanState *pusher) +{ + HeapTuple tuple = &(scan->rs_ctup); + bool backward = ScanDirectionIsBackward(dir); + BlockNumber page; + bool finished; + Page dp; + int lines; + int lineindex; + OffsetNumber lineoff; + int linesleft; + ItemId lpp; + + /* no movement is not supported for now */ + Assert(!ScanDirectionIsNoMovement(dir)); + + /* + * calculate next starting lineindex, given scan direction + */ + if (ScanDirectionIsForward(dir)) + { + if (!scan->rs_inited) + { + /* + * return null immediately if relation is empty + */ + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + SeqPushHeapTuple(&(scan->rs_ctup), node, pusher); + return; + } + if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + + /* Other processes might have already finished the scan. */ + if (page == InvalidBlockNumber) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + SeqPushHeapTuple(&(scan->rs_ctup), node, pusher); + return; + } + } + else + page = scan->rs_startblock; /* first page */ + heapgetpage(scan, page); + lineindex = 0; + scan->rs_inited = true; + } + else + { + /* continue from previously returned page/tuple */ + page = scan->rs_cblock; /* current page */ + lineindex = scan->rs_cindex + 1; + } + + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + lines = scan->rs_ntuples; + /* page and lineindex now reference the next visible tid */ + + linesleft = lines - lineindex; + } + else /* backward */ + { + /* backward parallel scan not supported */ + Assert(scan->rs_parallel == NULL); + + if (!scan->rs_inited) + { + /* + * return null immediately if relation is empty + */ + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + SeqPushHeapTuple(&(scan->rs_ctup), node, pusher); + return; + } + + /* + * Disable reporting to syncscan logic in a backwards scan; it's + * not very likely anyone else is doing the same thing at the same + * time, and much more likely that we'll just bollix things for + * forward scanners. + */ + scan->rs_syncscan = false; + /* start from last page of the scan */ + if (scan->rs_startblock > 0) + page = scan->rs_startblock - 1; + else + page = scan->rs_nblocks - 1; + heapgetpage(scan, page); + } + else + { + /* continue from previously returned page/tuple */ + page = scan->rs_cblock; /* current page */ + } + + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + lines = scan->rs_ntuples; + + if (!scan->rs_inited) + { + lineindex = lines - 1; + scan->rs_inited = true; + } + else + { + lineindex = scan->rs_cindex - 1; + } + /* page and lineindex now reference the previous visible tid */ + + linesleft = lineindex + 1; + } + + /* + * advance the scan until we find a qualifying tuple or run out of stuff + * to scan + */ + for (;;) + { + while (linesleft > 0) + { + bool tuple_qualifies = false; + + lineoff = scan->rs_vistuples[lineindex]; + lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); + + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), page, lineoff); + + /* + * if current tuple qualifies, push it. + */ + if (key != NULL) + { + HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + nkeys, key, tuple_qualifies); + } + else + { + tuple_qualifies = true; + } + + if (tuple_qualifies) + { + /* Push tuple */ + scan->rs_cindex = lineindex; + pgstat_count_heap_getnext(scan->rs_rd); + if (!SeqPushHeapTuple(&(scan->rs_ctup), node, pusher)) + return; + } + + /* + * and carry on to the next one anyway + */ + --linesleft; + if (backward) + --lineindex; + else + ++lineindex; + } + + /* + * if we get here, it means we've exhausted the items on this page and + * it's time to move to the next. + */ + if (backward) + { + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + if (page == 0) + page = scan->rs_nblocks; + page--; + } + else if (scan->rs_parallel != NULL) + { + page = heap_parallelscan_nextpage(scan); + finished = (page == InvalidBlockNumber); + } + else + { + page++; + if (page >= scan->rs_nblocks) + page = 0; + finished = (page == scan->rs_startblock) || + (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); + + /* + * Report our new scan position for synchronization purposes. We + * don't do that when moving backwards, however. That would just + * mess up any other forward-moving scanners. + * + * Note: we do this before checking for end of scan so that the + * final state of the position hint is back at the start of the + * rel. That's not strictly necessary, but otherwise when you run + * the same query multiple times the starting position would shift + * a little bit backwards on every invocation, which is confusing. + * We don't guarantee any specific ordering in general, though. + */ + if (scan->rs_syncscan) + ss_report_location(scan->rs_rd, page); + } + + /* + * return NULL if we've exhausted all the pages + */ + if (finished) + { + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; + SeqPushHeapTuple(&(scan->rs_ctup), node, pusher); + return; + } + + heapgetpage(scan, page); + + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + lines = scan->rs_ntuples; + linesleft = lines; + if (backward) + lineindex = lines - 1; + else + lineindex = 0; + } +} diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index a95cfe5430..b0468667bb 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -149,6 +149,13 @@ ExecInitNode(Plan *node, EState *estate, int eflags, PlanState *parent) switch (nodeTag(node)) { + /* + * scan nodes + */ + case T_SeqScan: + result = (PlanState *) ExecInitSeqScan((SeqScan *) node, + estate, eflags, parent); + break; default: elog(ERROR, "unrecognized/unsupported node type: %d", (int) nodeTag(node)); @@ -211,6 +218,9 @@ pushTuple(TupleTableSlot *slot, PlanState *node, PlanState *pusher) { switch (nodeTag(node)) { + case T_SeqScanState: + return pushTupleToSeqScan((SeqScanState *) node); + default: elog(ERROR, "bottom node type not supported: %d", (int) nodeTag(node)); @@ -263,6 +273,13 @@ ExecEndNode(PlanState *node) switch (nodeTag(node)) { + /* + * scan nodes + */ + case T_SeqScanState: + ExecEndSeqScan((SeqScanState *) node); + break; + default: elog(ERROR, "unrecognized/unsupported node type: %d", (int) nodeTag(node)); diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index e61895de0a..8c0aa44f0f 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -15,7 +15,7 @@ /* * INTERFACE ROUTINES * ExecSeqScan sequentially scans a relation. - * ExecSeqNext retrieve next tuple in sequential order. + * pushTupleToSeqScan pushes all tuples to parent node * ExecInitSeqScan creates and initializes a seqscan node. * ExecEndSeqScan releases any storage allocated. * ExecReScanSeqScan rescans the relation @@ -30,29 +30,24 @@ #include "executor/execdebug.h" #include "executor/nodeSeqscan.h" #include "utils/rel.h" +#include "access/heapam.h" static void InitScanRelation(SeqScanState *node, EState *estate, int eflags); -static TupleTableSlot *SeqNext(SeqScanState *node); /* ---------------------------------------------------------------- * Scan Support * ---------------------------------------------------------------- */ -/* ---------------------------------------------------------------- - * SeqNext - * - * This is a workhorse for ExecSeqScan - * ---------------------------------------------------------------- +/* Push scanned tuples to the parent. Stop when all tuples are pushed or + * the parent told us to stop pushing. */ -static TupleTableSlot * -SeqNext(SeqScanState *node) +bool +pushTupleToSeqScan(SeqScanState *node) { - HeapTuple tuple; - HeapScanDesc scandesc; EState *estate; + HeapScanDesc scandesc; ScanDirection direction; - TupleTableSlot *slot; /* * get information from the estate and scan state @@ -60,8 +55,11 @@ SeqNext(SeqScanState *node) scandesc = node->ss.ss_currentScanDesc; estate = node->ss.ps.state; direction = estate->es_direction; - slot = node->ss.ss_ScanTupleSlot; + /* ExecScanFetch not implemented */ + Assert(estate->es_epqTuple == NULL); + + /* create scandesc, part of old SeqNext before heap_getnext */ if (scandesc == NULL) { /* @@ -73,30 +71,17 @@ SeqNext(SeqScanState *node) 0, NULL); node->ss.ss_currentScanDesc = scandesc; } + Assert(scandesc); - /* - * get the next tuple from the table - */ - tuple = heap_getnext(scandesc, direction); + /* not-page-at-time not supported for now */ + Assert(scandesc->rs_pageatatime); + heappushtups(scandesc, direction, + scandesc->rs_nkeys, + scandesc->rs_key, + node->ss.ps.parent, + node); - /* - * save the tuple and the buffer returned to us by the access methods in - * our scan tuple slot and return the slot. Note: we pass 'false' because - * tuples returned by heap_getnext() are pointers onto disk pages and were - * not created with palloc() and so should not be pfree()'d. Note also - * that ExecStoreTuple will increment the refcount of the buffer; the - * refcount will not be dropped until the tuple table slot is cleared. - */ - if (tuple) - ExecStoreTuple(tuple, /* tuple to store */ - slot, /* slot to store in */ - scandesc->rs_cbuf, /* buffer associated with this - * tuple */ - false); /* don't pfree this pointer */ - else - ExecClearTuple(slot); - - return slot; + return false; } /* @@ -113,23 +98,6 @@ SeqRecheck(SeqScanState *node, TupleTableSlot *slot) } /* ---------------------------------------------------------------- - * ExecSeqScan(node) - * - * Scans the relation sequentially and returns the next qualifying - * tuple. - * We call the ExecScan() routine and pass it the appropriate - * access method functions. - * ---------------------------------------------------------------- - */ -TupleTableSlot * -ExecSeqScan(SeqScanState *node) -{ - return ExecScan((ScanState *) node, - (ExecScanAccessMtd) SeqNext, - (ExecScanRecheckMtd) SeqRecheck); -} - -/* ---------------------------------------------------------------- * InitScanRelation * * Set up to access the scan relation. @@ -154,13 +122,12 @@ InitScanRelation(SeqScanState *node, EState *estate, int eflags) ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation)); } - /* ---------------------------------------------------------------- * ExecInitSeqScan * ---------------------------------------------------------------- */ SeqScanState * -ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) +ExecInitSeqScan(SeqScan *node, EState *estate, int eflags, PlanState *parent) { SeqScanState *scanstate; @@ -177,6 +144,7 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) scanstate = makeNode(SeqScanState); scanstate->ss.ps.plan = (Plan *) node; scanstate->ss.ps.state = estate; + scanstate->ss.ps.parent = parent; /* * Miscellaneous initialization diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 7e85510d2f..a0b826e88d 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -126,6 +126,15 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key, bool allow_strat, bool allow_sync, bool allow_pagemode); extern void heap_endscan(HeapScanDesc scan); extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction); +/* forward decls because now we need to know about PlanState */ +typedef struct PlanState PlanState; +typedef struct SeqScanState SeqScanState; +extern void heappushtups(HeapScanDesc scan, + ScanDirection dir, + int nkeys, + ScanKey key, + PlanState *node, + SeqScanState *pusher); extern Size heap_parallelscan_estimate(Snapshot snapshot); extern void heap_parallelscan_initialize(ParallelHeapScanDesc target, diff --git a/src/include/executor/nodeSeqscan.h b/src/include/executor/nodeSeqscan.h index 92b305e138..21b8e42b6e 100644 --- a/src/include/executor/nodeSeqscan.h +++ b/src/include/executor/nodeSeqscan.h @@ -15,10 +15,15 @@ #define NODESEQSCAN_H #include "access/parallel.h" +#include "access/relscan.h" #include "nodes/execnodes.h" +#include "executor/executor.h" +#include "utils/memutils.h" +#include "miscadmin.h" -extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags); -extern TupleTableSlot *ExecSeqScan(SeqScanState *node); +extern SeqScanState *ExecInitSeqScan(SeqScan *node, EState *estate, int eflags, + PlanState *parent); +extern bool pushTupleToSeqScan(SeqScanState *node); extern void ExecEndSeqScan(SeqScanState *node); extern void ExecReScanSeqScan(SeqScanState *node); @@ -27,4 +32,144 @@ extern void ExecSeqScanEstimate(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeDSM(SeqScanState *node, ParallelContext *pcxt); extern void ExecSeqScanInitializeWorker(SeqScanState *node, shm_toc *toc); +/* inline functions decls and implementations */ +#pragma GCC diagnostic warning "-Winline" +static inline void SeqPushNull(PlanState *node, SeqScanState *pusher); +static inline TupleTableSlot *SeqStoreTuple(SeqScanState *node, + HeapTuple tuple); +static inline bool SeqPushHeapTuple(HeapTuple tuple, PlanState *node, + SeqScanState *pusher); + +/* push NULL to the parent, signaling that we are done */ +static inline void +SeqPushNull(PlanState *node, SeqScanState *pusher) +{ + ProjectionInfo *projInfo; + TupleTableSlot *slot; + + projInfo = pusher->ss.ps.ps_ProjInfo; + slot = pusher->ss.ss_ScanTupleSlot; + + ExecClearTuple(slot); + + if (projInfo) + pushTuple(ExecClearTuple(projInfo->pi_slot), node, + (PlanState *) pusher); + else + pushTuple(slot, node, + (PlanState *) pusher); +} + +/* + * HeapTuple --> node->ss_ScanTupleSlot, part of original SeqNext after + * heap_getnext + */ +static inline TupleTableSlot * +SeqStoreTuple(SeqScanState *node, HeapTuple tuple) +{ + HeapScanDesc scandesc; + TupleTableSlot *slot; + + /* + * get information from the scan state + */ + scandesc = node->ss.ss_currentScanDesc; + slot = node->ss.ss_ScanTupleSlot; + + Assert(tuple); + + /* + * save the tuple and the buffer returned to us by the access methods in + * our scan tuple slot. Note: we pass 'false' because tuples returned by + * heap_getnext() are pointers onto disk pages and were not created with + * palloc() and so should not be pfree()'d. Note also that ExecStoreTuple + * will increment the refcount of the buffer; the refcount will not be + * dropped until the tuple table slot is cleared. + */ + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + scandesc->rs_cbuf, /* buffer associated with this + * tuple */ + false); /* don't pfree this pointer */ + return slot; +} + +/* Push ready HeapTuple from SeqScanState + * + * check qual for the tuple and push it. Tuple must be not NULL. + * Returns true, if parent accepts more tuples, false otherwise + */ +static inline bool SeqPushHeapTuple(HeapTuple tuple, PlanState *node, + SeqScanState *pusher) +{ + ExprContext *econtext; + List *qual; + ProjectionInfo *projInfo; + TupleTableSlot *slot; + + if (tuple->t_data == NULL) + { + SeqPushNull(node, pusher); + return false; + } + + /* + * Fetch data from node + */ + qual = pusher->ss.ps.qual; + projInfo = pusher->ss.ps.ps_ProjInfo; + econtext = pusher->ss.ps.ps_ExprContext; + + CHECK_FOR_INTERRUPTS(); + + slot = SeqStoreTuple(pusher, tuple); + + /* + * If we have neither a qual to check nor a projection to do, just skip + * all the overhead and return the raw scan tuple. + */ + if (!qual && !projInfo) + { + return pushTuple(slot, node, (PlanState *) pusher); + } + + ResetExprContext(econtext); + /* + * place the current tuple into the expr context + */ + econtext->ecxt_scantuple = slot; + + /* + * check that the current tuple satisfies the qual-clause + * + * check for non-nil qual here to avoid a function call to ExecQual() + * when the qual is nil ... saves only a few cycles, but they add up + * ... + */ + if (!qual || ExecQual(qual, econtext, false)) + { + /* + * Found a satisfactory scan tuple. + */ + if (projInfo) + { + /* + * Form a projection tuple, store it in the result tuple slot + * and push it --- unless we find we can project no tuples + * from this scan tuple, in which case continue scan. + */ + slot = ExecProject(projInfo); + } + /* + * Here, we aren't projecting, so just push scan tuple. + */ + return pushTuple(slot, node, (PlanState *) pusher); + } + else + InstrCountFiltered1(pusher, 1); + + return true; +} + + #endif /* NODESEQSCAN_H */ -- 2.11.0