diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile index 21721b4..823d5c3 100644 --- a/src/backend/access/Makefile +++ b/src/backend/access/Makefile @@ -8,6 +8,6 @@ subdir = src/backend/access top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist transam +SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc shmmq spgist transam include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/shmmq/Makefile b/src/backend/access/shmmq/Makefile new file mode 100644 index 0000000..aeae8d9 --- /dev/null +++ b/src/backend/access/shmmq/Makefile @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/shmmq +# +# IDENTIFICATION +# src/backend/access/shmmq/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/shmmq +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = shmmqam.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/shmmq/shmmqam.c b/src/backend/access/shmmq/shmmqam.c new file mode 100644 index 0000000..d8bd596 --- /dev/null +++ b/src/backend/access/shmmq/shmmqam.c @@ -0,0 +1,92 @@ +/*------------------------------------------------------------------------- + * + * shmmqam.c + * shared memory queue access method code + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/shmmq/shmmqam.c + * + * + * INTERFACE ROUTINES + * shm_getnext - retrieve next tuple in queue + * + * NOTES + * This file contains the shmmq_ routines which implement + * the POSTGRES shared memory access method used for all POSTGRES + * relations. + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/shmmqam.h" +//#include "access/tupdesc.h" +#include "fmgr.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "utils/lsyscache.h" + + + +/* + * ExecInitWorkerResult + * + * Initializes the result state to retrieve tuples from worker backends. + */ +worker_result +ExecInitWorkerResult(void) +{ + worker_result workerResult; + + workerResult = palloc0(sizeof(worker_result_state)); + + return workerResult; +} + +/* + * shm_getnext + * + * Get the next tuple from shared memory queue. This function + * is reponsible for fetching tuples from all the queues associated + * with worker backends used in parallel sequential scan. + */ +HeapTuple +shm_getnext(HeapScanDesc scanDesc, worker_result resultState, + TupleQueueFunnel *funnel, ScanDirection direction, + bool *fromheap) +{ + HeapTuple tup; + + while (!resultState->all_workers_done || !resultState->local_scan_done) + { + if (!resultState->all_workers_done) + { + /* wait only if local scan is done */ + tup = TupleQueueFunnelNext(funnel, !resultState->local_scan_done, + &resultState->all_workers_done); + if (HeapTupleIsValid(tup)) + { + *fromheap = false; + return tup; + } + } + if (!resultState->local_scan_done) + { + tup = heap_getnext(scanDesc, direction); + if (HeapTupleIsValid(tup)) + { + *fromheap = true; + return tup; + } + resultState->local_scan_done = true; + } + } + + return NULL; +} diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index a951c55..8410afa 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -721,6 +721,7 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) switch (nodeTag(plan)) { case T_SeqScan: + case T_Funnel: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -916,6 +917,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_SeqScan: pname = sname = "Seq Scan"; break; + case T_Funnel: + pname = sname = "Funnel"; + break; case T_IndexScan: pname = sname = "Index Scan"; break; @@ -1065,6 +1069,7 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (nodeTag(plan)) { case T_SeqScan: + case T_Funnel: case T_BitmapHeapScan: case T_TidScan: case T_SubqueryScan: @@ -1206,6 +1211,24 @@ ExplainNode(PlanState *planstate, List *ancestors, } /* + * Aggregate instrumentation information of all the backend + * workers for parallel sequence scan. + */ + if (es->analyze && nodeTag(plan) == T_Funnel) + { + int i; + Instrumentation *instrument_worker; + int nworkers = ((FunnelState *)planstate)->pcxt->nworkers; + char *inst_info_workers = ((FunnelState *)planstate)->inst_options_space; + + for (i = 0; i < nworkers; i++) + { + instrument_worker = (Instrumentation *)(inst_info_workers + (i * sizeof(Instrumentation))); + InstrAggNode(planstate->instrument, instrument_worker); + } + } + + /* * We have to forcibly clean up the instrumentation state because we * haven't done ExecutorEnd yet. This is pretty grotty ... * @@ -1331,6 +1354,14 @@ ExplainNode(PlanState *planstate, List *ancestors, show_instrumentation_count("Rows Removed by Filter", 1, planstate, es); break; + case T_Funnel: + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + ExplainPropertyInteger("Number of Workers", + ((Funnel *) plan)->num_workers, es); + break; case T_FunctionScan: if (es->verbose) { @@ -2214,6 +2245,7 @@ ExplainTargetRel(Plan *plan, Index rti, ExplainState *es) switch (nodeTag(plan)) { case T_SeqScan: + case T_Funnel: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index af707b0..991ff51 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -16,14 +16,15 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execJunk.o execMain.o \ execProcnode.o execQual.o execScan.o execTuples.o \ execUtils.o functions.o instrument.o nodeAppend.o nodeAgg.o \ nodeBitmapAnd.o nodeBitmapOr.o \ - nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeHash.o \ - nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ + nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeFunnel.o \ + nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ nodeLimit.o nodeLockRows.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \ - nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ - nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \ + nodeSeqscan.o nodePartialSeqscan.o nodeSetOp.o nodeSort.o \ + nodeUnique.o nodeValuesscan.o nodeCtescan.o nodeWorktablescan.o \ nodeGroup.o nodeSubplan.o nodeSubqueryscan.o nodeTidscan.o \ - nodeForeignscan.o nodeWindowAgg.o tstoreReceiver.o spi.o + nodeForeignscan.o nodeWindowAgg.o tqueue.o tstoreReceiver.o \ + spi.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/executor/execCurrent.c b/src/backend/executor/execCurrent.c index 1c8be25..f13b7bc 100644 --- a/src/backend/executor/execCurrent.c +++ b/src/backend/executor/execCurrent.c @@ -261,6 +261,8 @@ search_plan_tree(PlanState *node, Oid table_oid) * Relation scan nodes can all be treated alike */ case T_SeqScanState: + case T_PartialSeqScanState: + case T_FunnelState: case T_IndexScanState: case T_IndexOnlyScanState: case T_BitmapHeapScanState: diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 07526e8..9a3e285 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -181,6 +181,8 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) estate->es_param_exec_vals = (ParamExecData *) palloc0(queryDesc->plannedstmt->nParamExec * sizeof(ParamExecData)); + estate->toc = queryDesc->toc; + /* * If non-read-only query, set the command ID to mark output tuples with */ diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 9892499..1a1275c 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -100,6 +100,8 @@ #include "executor/nodeMergejoin.h" #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" +#include "executor/nodePartialSeqscan.h" +#include "executor/nodeFunnel.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" #include "executor/nodeSeqscan.h" @@ -190,6 +192,16 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_PartialSeqScan: + result = (PlanState *) ExecInitPartialSeqScan((PartialSeqScan *) node, + estate, eflags); + break; + + case T_Funnel: + result = (PlanState *) ExecInitFunnel((Funnel *) node, + estate, eflags); + break; + case T_IndexScan: result = (PlanState *) ExecInitIndexScan((IndexScan *) node, estate, eflags); @@ -406,6 +418,14 @@ ExecProcNode(PlanState *node) result = ExecSeqScan((SeqScanState *) node); break; + case T_PartialSeqScanState: + result = ExecPartialSeqScan((PartialSeqScanState *) node); + break; + + case T_FunnelState: + result = ExecFunnel((FunnelState *) node); + break; + case T_IndexScanState: result = ExecIndexScan((IndexScanState *) node); break; @@ -644,6 +664,14 @@ ExecEndNode(PlanState *node) ExecEndSeqScan((SeqScanState *) node); break; + case T_PartialSeqScanState: + ExecEndPartialSeqScan((PartialSeqScanState *) node); + break; + + case T_FunnelState: + ExecEndFunnel((FunnelState *) node); + break; + case T_IndexScanState: ExecEndIndexScan((IndexScanState *) node); break; diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c index 3f0d809..caf9855 100644 --- a/src/backend/executor/execScan.c +++ b/src/backend/executor/execScan.c @@ -191,13 +191,20 @@ ExecScan(ScanState *node, * check for non-nil qual here to avoid a function call to ExecQual() * when the qual is nil ... saves only a few cycles, but they add up * ... + * + * check for non-heap tuples (can get such tuples from shared memory + * message queue's in case of parallel query), for such tuples no need + * to perform qualification or projection as for them the same is done + * by worker backend. This case will happen only for parallel query + * where we push down the qualification and projection (targetlist) + * information. */ - if (!qual || ExecQual(qual, econtext, false)) + if (!slot->tts_fromheap || !qual || ExecQual(qual, econtext, false)) { /* * Found a satisfactory scan tuple. */ - if (projInfo) + if (projInfo && slot->tts_fromheap) { /* * Form a projection tuple, store it in the result tuple slot @@ -211,6 +218,23 @@ ExecScan(ScanState *node, return resultSlot; } } + else if (projInfo && !slot->tts_fromheap) + { + /* + * Store the tuple we got from shared memory tuple queue + * in projection slot as the worker backend wtakes care + * of doing projection. We don't need to free this tuple + * as this is pointing to scan tuple slot which will take + * care of freeing it. + */ + ExecStoreTuple(econtext->ecxt_scantuple->tts_tuple, /* tuple to store */ + projInfo->pi_slot, /* slot to store in */ + InvalidBuffer, /* buffer associated with this + * tuple */ + false); /* pfree this pointer */ + + return projInfo->pi_slot; + } else { /* diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 753754d..4c5bd88 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -123,6 +123,7 @@ MakeTupleTableSlot(void) slot->tts_values = NULL; slot->tts_isnull = NULL; slot->tts_mintuple = NULL; + slot->tts_fromheap = true; return slot; } @@ -473,6 +474,8 @@ ExecClearTuple(TupleTableSlot *slot) /* slot in which to store tuple */ slot->tts_isempty = true; slot->tts_nvalid = 0; + slot->tts_fromheap = true; + return slot; } diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index 022041b..79eeaee 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -145,6 +145,8 @@ CreateExecutorState(void) estate->es_auxmodifytables = NIL; + estate->toc = NULL; + estate->es_per_tuple_exprcontext = NULL; estate->es_epqTuple = NULL; diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index f5351eb..56e509d 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -21,6 +21,8 @@ BufferUsage pgBufferUsage; static void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); +static void +BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); /* Allocate new instrumentation structure(s) */ @@ -127,6 +129,28 @@ InstrEndLoop(Instrumentation *instr) instr->tuplecount = 0; } +/* + * Aggregate the instrumentation information. This is used + * to aggregate the information of worker backends. We only + * need to sum the buffer usage and tuple count statistics as + * for other timing related statistics it is sufficient to + * have the master backend's information. + */ +void +InstrAggNode(Instrumentation *instr1, Instrumentation *instr2) +{ + /* count the returned tuples */ + instr1->tuplecount += instr2->tuplecount; + + instr1->nfiltered1 += instr2->nfiltered1; + instr1->nfiltered2 += instr2->nfiltered2; + + /* Add delta of buffer usage since entry to node's totals */ + if (instr1->need_bufusage) + BufferUsageAdd(&instr1->bufusage, &instr2->bufusage); + +} + /* dst += add - sub */ static void BufferUsageAccumDiff(BufferUsage *dst, @@ -148,3 +172,21 @@ BufferUsageAccumDiff(BufferUsage *dst, INSTR_TIME_ACCUM_DIFF(dst->blk_write_time, add->blk_write_time, sub->blk_write_time); } + +/* dst += add */ +static void +BufferUsageAdd(BufferUsage *dst, const BufferUsage *add) +{ + dst->shared_blks_hit += add->shared_blks_hit; + dst->shared_blks_read += add->shared_blks_read; + dst->shared_blks_dirtied += add->shared_blks_dirtied; + dst->shared_blks_written += add->shared_blks_written; + dst->local_blks_hit += add->local_blks_hit; + dst->local_blks_read += add->local_blks_read; + dst->local_blks_dirtied += add->local_blks_dirtied; + dst->local_blks_written += add->local_blks_written; + dst->temp_blks_read += add->temp_blks_read; + dst->temp_blks_written += add->temp_blks_written; + INSTR_TIME_ADD(dst->blk_read_time, add->blk_read_time); + INSTR_TIME_ADD(dst->blk_write_time, add->blk_write_time); +} diff --git a/src/backend/executor/nodeFunnel.c b/src/backend/executor/nodeFunnel.c new file mode 100644 index 0000000..71f6daa --- /dev/null +++ b/src/backend/executor/nodeFunnel.c @@ -0,0 +1,301 @@ +/*------------------------------------------------------------------------- + * + * nodeFunnel.c + * Support routines for parallel sequential scans of relations. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeFunnel.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * ExecFunnel scans a relation. + * FunnelNext retrieve next tuple from either heap or shared memory segment. + * ExecInitFunnel creates and initializes a parallel seqscan node. + * ExecEndFunnel releases any storage allocated. + */ +#include "postgres.h" + +#include "access/relscan.h" +#include "access/shmmqam.h" +#include "access/xact.h" +#include "commands/dbcommands.h" +#include "executor/execdebug.h" +#include "executor/nodeSeqscan.h" +#include "executor/nodeFunnel.h" +#include "postmaster/backendworker.h" +#include "utils/rel.h" + + + +/* ---------------------------------------------------------------- + * Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * FunnelNext + * + * This is a workhorse for ExecFunnel + * ---------------------------------------------------------------- + */ +static TupleTableSlot * +FunnelNext(FunnelState *node) +{ + HeapTuple tuple; + HeapScanDesc scandesc; + EState *estate; + ScanDirection direction; + TupleTableSlot *slot; + bool fromheap = true; + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + slot = node->ss.ss_ScanTupleSlot; + + /* + * get the next tuple from the table based on result tuple descriptor. + */ + tuple = shm_getnext(scandesc, + node->pss_workerResult, + node->funnel, + direction, + &fromheap); + + slot->tts_fromheap = fromheap; + + /* + * save the tuple and the buffer returned to us by the access methods in + * our scan tuple slot and return the slot. Note: we pass '!fromheap' + * because tuples returned by shm_getnext() are either pointers that are + * created with palloc() or are pointers onto disk pages and so it should + * be pfree()'d accordingly. Note also that ExecStoreTuple will increment + * the refcount of the buffer; the refcount will not be dropped until the + * tuple table slot is cleared. + */ + if (tuple) + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + fromheap ? scandesc->rs_cbuf : InvalidBuffer, /* buffer associated with this + * tuple */ + !fromheap); /* pfree this pointer if not from heap */ + else + ExecClearTuple(slot); + + return slot; +} + +/* + * FunnelRecheck -- access method routine to recheck a tuple in EvalPlanQual + */ +static bool +FunnelRecheck(SeqScanState *node, TupleTableSlot *slot) +{ + /* + * Note that unlike IndexScan, Funnel never use keys in + * heap_beginscan (and this is very bad) - so, here + * we do not check are keys ok or not. + */ + return true; +} + +/* ---------------------------------------------------------------- + * InitFunnelRelation + * + * Set up to access the scan relation. + * ---------------------------------------------------------------- + */ +static void +InitFunnelRelation(FunnelState *node, EState *estate, int eflags) +{ + Relation currentRelation; + HeapScanDesc currentScanDesc; + ParallelHeapScanDesc pscan; + + /* + * get the relation object id from the relid'th entry in the range table, + * open that relation and acquire appropriate lock on it. + */ + currentRelation = ExecOpenScanRelation(estate, + ((SeqScan *) node->ss.ps.plan)->scanrelid, + eflags); + + /* Initialize the workers required to perform parallel scan. */ + InitializeParallelWorkers(node->ss.ps.plan->lefttree, + estate, + currentRelation, + &node->inst_options_space, + &node->responseq, + &node->pcxt, + &pscan, + ((Funnel *)(node->ss.ps.plan))->num_workers); + + currentScanDesc = heap_beginscan_parallel(currentRelation, pscan); + + node->ss.ss_currentRelation = currentRelation; + node->ss.ss_currentScanDesc = currentScanDesc; + + /* and report the scan tuple slot's rowtype */ + ExecAssignScanType(&node->ss, RelationGetDescr(currentRelation)); +} + +/* ---------------------------------------------------------------- + * ExecInitFunnel + * ---------------------------------------------------------------- + */ +FunnelState * +ExecInitFunnel(Funnel *node, EState *estate, int eflags) +{ + FunnelState *funnelstate; + + /* + * Once upon a time it was possible to have an outerPlan of a SeqScan, but + * not any more. + */ + Assert(outerPlan(node) == NULL); + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + funnelstate = makeNode(FunnelState); + funnelstate->ss.ps.plan = (Plan *) node; + funnelstate->ss.ps.state = estate; + funnelstate->fs_workersReady = false; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &funnelstate->ss.ps); + + /* + * initialize child expressions + */ + funnelstate->ss.ps.targetlist = (List *) + ExecInitExpr((Expr *) node->scan.plan.targetlist, + (PlanState *) funnelstate); + funnelstate->ss.ps.qual = (List *) + ExecInitExpr((Expr *) node->scan.plan.qual, + (PlanState *) funnelstate); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &funnelstate->ss.ps); + ExecInitScanTupleSlot(estate, &funnelstate->ss); + + InitFunnelRelation(funnelstate, estate, eflags); + + funnelstate->ss.ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&funnelstate->ss.ps); + ExecAssignScanProjectionInfo(&funnelstate->ss); + + funnelstate->pss_workerResult = ExecInitWorkerResult(); + + return funnelstate; +} + +/* ---------------------------------------------------------------- + * ExecFunnel(node) + * + * Scans the relation via multiple workers and returns + * the next qualifying tuple. + * We call the ExecScan() routine and pass it the appropriate + * access method functions. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecFunnel(FunnelState *node) +{ + int i; + + /* + * if parallel context is set and workers are not + * registered, register them now. + */ + if (node->pcxt && !node->fs_workersReady) + { + /* Register backend workers. */ + LaunchParallelWorkers(node->pcxt); + + node->funnel = CreateTupleQueueFunnel(); + + for (i = 0; i < node->pcxt->nworkers; ++i) + { + shm_mq_set_handle((node->responseq)[i], node->pcxt->worker[i].bgwhandle); + RegisterTupleQueueOnFunnel(node->funnel, (node->responseq)[i]); + } + + node->fs_workersReady = true; + } + + return ExecScan((ScanState *) &node->ss, + (ExecScanAccessMtd) FunnelNext, + (ExecScanRecheckMtd) FunnelRecheck); +} + +/* ---------------------------------------------------------------- + * ExecEndFunnel + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndFunnel(FunnelState *node) +{ + Relation relation; + HeapScanDesc scanDesc; + + /* + * get information from node + */ + relation = node->ss.ss_currentRelation; + scanDesc = node->ss.ss_currentScanDesc; + + /* + * Free the exprcontext + */ + ExecFreeExprContext(&node->ss.ps); + + /* + * clean out the tuple table + */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + ExecClearTuple(node->ss.ss_ScanTupleSlot); + + /* + * close heap scan + */ + heap_endscan(scanDesc); + + /* + * close the heap relation. + */ + ExecCloseScanRelation(relation); + + if (node->pcxt) + { + /* destroy the tuple queue */ + DestroyTupleQueueFunnel(node->funnel); + + /* destroy parallel context. */ + DestroyParallelContext(node->pcxt); + + ExitParallelMode(); + } +} diff --git a/src/backend/executor/nodePartialSeqscan.c b/src/backend/executor/nodePartialSeqscan.c new file mode 100644 index 0000000..fb4efa3 --- /dev/null +++ b/src/backend/executor/nodePartialSeqscan.c @@ -0,0 +1,259 @@ +/*------------------------------------------------------------------------- + * + * nodePartialSeqscan.c + * Support routines for parallel sequential scans of relations. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/executor/nodeFunnel.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * ExecPartialSeqScan scans a relation. + * PartialSeqNext retrieve next tuple from either heap. + * ExecInitPartialSeqScan creates and initializes a partial seqscan node. + * ExecEndPartialSeqScan releases any storage allocated. + */ +#include "postgres.h" + +#include "access/relscan.h" +#include "access/shmmqam.h" +#include "access/xact.h" +#include "commands/dbcommands.h" +#include "executor/execdebug.h" +#include "executor/nodeSeqscan.h" +#include "executor/nodePartialSeqscan.h" +#include "postmaster/backendworker.h" +#include "utils/rel.h" + + + +/* ---------------------------------------------------------------- + * Scan Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * PartialSeqNext + * + * This is a workhorse for ExecPartialSeqScan + * ---------------------------------------------------------------- + */ +static TupleTableSlot * +PartialSeqNext(PartialSeqScanState *node) +{ + HeapTuple tuple; + HeapScanDesc scandesc; + EState *estate; + ScanDirection direction; + TupleTableSlot *slot; + + /* + * get information from the estate and scan state + */ + scandesc = node->ss_currentScanDesc; + estate = node->ps.state; + direction = estate->es_direction; + slot = node->ss_ScanTupleSlot; + + /* + * get the next tuple from the table + */ + tuple = heap_getnext(scandesc, direction); + + /* + * save the tuple and the buffer returned to us by the access methods in + * our scan tuple slot and return the slot. Note: we pass 'false' because + * tuples returned by heap_getnext() are pointers onto disk pages and were + * not created with palloc() and so should not be pfree()'d. Note also + * that ExecStoreTuple will increment the refcount of the buffer; the + * refcount will not be dropped until the tuple table slot is cleared. + */ + if (tuple) + ExecStoreTuple(tuple, /* tuple to store */ + slot, /* slot to store in */ + scandesc->rs_cbuf, /* buffer associated with this + * tuple */ + false); /* don't pfree this pointer */ + else + ExecClearTuple(slot); + + return slot; +} + +/* + * PartialSeqRecheck -- access method routine to recheck a tuple in EvalPlanQual + */ +static bool +PartialSeqRecheck(PartialSeqScanState *node, TupleTableSlot *slot) +{ + /* + * Note that unlike IndexScan, PartialSeqScan never use keys in + * heap_beginscan (and this is very bad) - so, here we do not + * check are keys ok or not. + */ + return true; +} + +/* ---------------------------------------------------------------- + * InitPartialScanRelation + * + * Set up to access the scan relation. + * ---------------------------------------------------------------- + */ +static void +InitPartialScanRelation(PartialSeqScanState *node, EState *estate, int eflags) +{ + Relation currentRelation; + HeapScanDesc currentScanDesc; + ParallelHeapScanDesc pscan; + + /* + * get the relation object id from the relid'th entry in the range table, + * open that relation and acquire appropriate lock on it. + */ + currentRelation = ExecOpenScanRelation(estate, + ((Scan *) node->ps.plan)->scanrelid, + eflags); + + /* + * Parallel scan descriptor is initialized and stored in dynamic shared + * memory segment by master backend and parallel workers retrieve it + * from shared memory. + */ + Assert(estate->toc); + + pscan = shm_toc_lookup(estate->toc, PARALLEL_KEY_SCAN); + + currentScanDesc = heap_beginscan_parallel(currentRelation, pscan); + + node->ss_currentRelation = currentRelation; + node->ss_currentScanDesc = currentScanDesc; + + /* and report the scan tuple slot's rowtype */ + ExecAssignScanType(node, RelationGetDescr(currentRelation)); +} + +/* ---------------------------------------------------------------- + * ExecInitPartialSeqScan + * ---------------------------------------------------------------- + */ +PartialSeqScanState * +ExecInitPartialSeqScan(PartialSeqScan *node, EState *estate, int eflags) +{ + PartialSeqScanState *scanstate; + + /* + * Once upon a time it was possible to have an outerPlan of a SeqScan, but + * not any more. + */ + Assert(outerPlan(node) == NULL); + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + scanstate = makeNode(PartialSeqScanState); + scanstate->ps.plan = (Plan *) node; + scanstate->ps.state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &scanstate->ps); + + /* + * initialize child expressions + */ + scanstate->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) scanstate); + scanstate->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) scanstate); + + /* + * tuple table initialization + */ + ExecInitResultTupleSlot(estate, &scanstate->ps); + ExecInitScanTupleSlot(estate, scanstate); + + /* + * initialize scan relation + */ + InitPartialScanRelation(scanstate, estate, eflags); + + scanstate->ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&scanstate->ps); + ExecAssignScanProjectionInfo(scanstate); + + return scanstate; +} + +/* ---------------------------------------------------------------- + * ExecPartialSeqScan(node) + * + * Scans the relation via multiple workers and returns + * the next qualifying tuple. + * We call the ExecScan() routine and pass it the appropriate + * access method functions. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecPartialSeqScan(PartialSeqScanState *node) +{ + return ExecScan((ScanState *) node, + (ExecScanAccessMtd) PartialSeqNext, + (ExecScanRecheckMtd) PartialSeqRecheck); +} + +/* ---------------------------------------------------------------- + * ExecEndPartialSeqScan + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndPartialSeqScan(PartialSeqScanState *node) +{ + Relation relation; + HeapScanDesc scanDesc; + + /* + * get information from node + */ + relation = node->ss_currentRelation; + scanDesc = node->ss_currentScanDesc; + + /* + * Free the exprcontext + */ + ExecFreeExprContext(&node->ps); + + /* + * clean out the tuple table + */ + ExecClearTuple(node->ps.ps_ResultTupleSlot); + ExecClearTuple(node->ss_ScanTupleSlot); + + /* + * close heap scan + */ + heap_endscan(scanDesc); + + /* + * close the heap relation. + */ + ExecCloseScanRelation(relation); +} diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c new file mode 100644 index 0000000..ee4e03e --- /dev/null +++ b/src/backend/executor/tqueue.c @@ -0,0 +1,272 @@ +/*------------------------------------------------------------------------- + * + * tqueue.c + * Use shm_mq to send & receive tuples between parallel backends + * + * A DestReceiver of type DestTupleQueue, which is a TQueueDestReciever + * under the hood, writes tuples from the executor to a shm_mq. + * + * A TupleQueueFunnel helps manage the process of reading tuples from + * one or more shm_mq objects being used as tuple queues. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/tqueue.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "executor/tqueue.h" +#include "miscadmin.h" + +typedef struct +{ + DestReceiver pub; + shm_mq_handle *handle; +} TQueueDestReceiver; + +struct TupleQueueFunnel +{ + int nqueues; + int maxqueues; + int nextqueue; + shm_mq_handle **queue; +}; + +/* + * Receive a tuple. + */ +static void +tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self) +{ + TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self; + HeapTuple tuple; + shm_mq_result result; + + tuple = ExecMaterializeSlot(slot); + result = shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false); + + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to send tuples"))); +} + +/* + * Prepare to receive tuples from executor. + */ +static void +tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo) +{ + /* do nothing */ +} + +/* + * Clean up at end of an executor run + */ +static void +tqueueShutdownReceiver(DestReceiver *self) +{ + /* do nothing */ +} + +/* + * Destroy receiver when done with it + */ +static void +tqueueDestroyReceiver(DestReceiver *self) +{ + pfree(self); +} + +/* + * Create a DestReceiver that writes tuples to a tuple queue. + */ +DestReceiver * +CreateTupleQueueDestReceiver(void) +{ + TQueueDestReceiver *self; + + self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver)); + + self->pub.receiveSlot = tqueueReceiveSlot; + self->pub.rStartup = tqueueStartupReceiver; + self->pub.rShutdown = tqueueShutdownReceiver; + self->pub.rDestroy = tqueueDestroyReceiver; + self->pub.mydest = DestTupleQueue; + + /* private fields will be set by SetTupleQueueDestReceiverParams */ + + return (DestReceiver *) self; +} + +/* + * Set parameters for a TupleQueueDestReceiver + */ +void +SetTupleQueueDestReceiverParams(DestReceiver *self, + shm_mq_handle *handle) +{ + TQueueDestReceiver *myState = (TQueueDestReceiver *) self; + + myState->handle = handle; +} + +/* + * Create a tuple queue funnel. + */ +TupleQueueFunnel * +CreateTupleQueueFunnel(void) +{ + TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel)); + + funnel->maxqueues = 8; + funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *)); + + return funnel; +} + +/* + * Destroy a tuple queue funnel. + */ +void +DestroyTupleQueueFunnel(TupleQueueFunnel *funnel) +{ + if (funnel) + { + pfree(funnel->queue); + pfree(funnel); + } +} + +/* + * Remember the shared memory queue handle in funnel. + */ +void +RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle) +{ + if (funnel->nqueues < funnel->maxqueues) + { + funnel->queue[funnel->nqueues++] = handle; + return; + } + + if (funnel->nqueues >= funnel->maxqueues) + { + int newsize = funnel->nqueues * 2; + + Assert(funnel->nqueues == funnel->maxqueues); + + funnel->queue = repalloc(funnel->queue, + newsize * sizeof(shm_mq_handle *)); + funnel->maxqueues = newsize; + } + + funnel->queue[funnel->nqueues++] = handle; +} + +/* + * Fetch a tuple from a tuple queue funnel. + * + * We try to read from the queues in round-robin fashion so as to avoid + * the situation where some workers get their tuples read expediently while + * others are barely ever serviced. + * + * Even when nowait = false, we read from the individual queues in + * non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK, + * it can still accumulate bytes from a partially-read message, so doing it + * this way should outperform doing a blocking read on each queue in turn. + * + * The return value is NULL if there are no remaining queues or if + * nowait = true and no queue returned a tuple without blocking. *done, if + * not NULL, is set to true when there are no remaining queues and false in + * any other case. + */ +HeapTuple +TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done) +{ + int waitpos = funnel->nextqueue; + + /* Corner case: called before adding any queues, or after all are gone. */ + if (funnel->nqueues == 0) + { + if (done != NULL) + *done = true; + return NULL; + } + + if (done != NULL) + *done = false; + + for (;;) + { + shm_mq_handle *mqh = funnel->queue[funnel->nextqueue]; + shm_mq_result result; + Size nbytes; + void *data; + + /* Attempt to read a message. */ + result = shm_mq_receive(mqh, &nbytes, &data, true); + + /* + * Normally, we advance funnel->nextqueue to the next queue at this + * point, but if we're pointing to a queue that we've just discovered + * is detached, then forget that queue and leave the pointer where it + * is. + */ + if (result != SHM_MQ_DETACHED) + funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues; + else + { + --funnel->nqueues; + if (funnel->nqueues == 0) + { + if (done != NULL) + *done = true; + return NULL; + } + memcpy(&funnel->queue[funnel->nextqueue], + &funnel->queue[funnel->nextqueue + 1], + sizeof(shm_mq_handle *) + * (funnel->nqueues - funnel->nextqueue)); + if (funnel->nextqueue < waitpos) + --waitpos; + } + + /* If we got a message, return it. */ + if (result == SHM_MQ_SUCCESS) + { + HeapTupleData htup; + + /* + * The tuple data we just read from the queue is only valid + * until we again attempt to read from it. Copy the tuple into + * a single palloc'd chunk as callers will expect. + */ + ItemPointerSetInvalid(&htup.t_self); + htup.t_tableOid = InvalidOid; + htup.t_len = nbytes; + htup.t_data = data; + return heap_copytuple(&htup); + } + + /* + * If we've visited all of the queues, then we should either give up + * and return NULL (if we're in non-blocking mode) or wait for the + * process latch to be set (otherwise). + */ + if (funnel->nextqueue == waitpos) + { + if (nowait) + return NULL; + WaitLatch(MyLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(MyLatch); + } + } +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 9fe8008..e51fc38 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -354,6 +354,43 @@ _copySeqScan(const SeqScan *from) } /* + * _copyPartialSeqScan + */ +static PartialSeqScan * +_copyPartialSeqScan(const SeqScan *from) +{ + PartialSeqScan *newnode = makeNode(PartialSeqScan); + + /* + * copy node superclass fields + */ + CopyScanFields((const Scan *) from, (Scan *) newnode); + + return newnode; +} + +/* + * _copyFunnel + */ +static Funnel * +_copyFunnel(const Funnel *from) +{ + Funnel *newnode = makeNode(Funnel); + + /* + * copy node superclass fields + */ + CopyScanFields((const Scan *) from, (Scan *) newnode); + + /* + * copy remainder of node + */ + COPY_SCALAR_FIELD(num_workers); + + return newnode; +} + +/* * _copyIndexScan */ static IndexScan * @@ -4044,6 +4081,12 @@ copyObject(const void *from) case T_SeqScan: retval = _copySeqScan(from); break; + case T_PartialSeqScan: + retval = _copyPartialSeqScan(from); + break; + case T_Funnel: + retval = _copyFunnel(from); + break; case T_IndexScan: retval = _copyIndexScan(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 775f482..3382ab2 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -439,6 +439,24 @@ _outSeqScan(StringInfo str, const SeqScan *node) } static void +_outPartialSeqScan(StringInfo str, const SeqScan *node) +{ + WRITE_NODE_TYPE("PARTIALSEQSCAN"); + + _outScanInfo(str, (const Scan *) node); +} + +static void +_outFunnel(StringInfo str, const Funnel *node) +{ + WRITE_NODE_TYPE("FUNNEL"); + + _outScanInfo(str, (const Scan *) node); + + WRITE_UINT_FIELD(num_workers); +} + +static void _outIndexScan(StringInfo str, const IndexScan *node) { WRITE_NODE_TYPE("INDEXSCAN"); @@ -2886,6 +2904,12 @@ _outNode(StringInfo str, const void *obj) case T_SeqScan: _outSeqScan(str, obj); break; + case T_PartialSeqScan: + _outPartialSeqScan(str, obj); + break; + case T_Funnel: + _outFunnel(str, obj); + break; case T_IndexScan: _outIndexScan(str, obj); break; diff --git a/src/backend/nodes/params.c b/src/backend/nodes/params.c index fb803f8..aa278c5 100644 --- a/src/backend/nodes/params.c +++ b/src/backend/nodes/params.c @@ -16,9 +16,22 @@ #include "postgres.h" #include "nodes/params.h" +#include "storage/shmem.h" #include "utils/datum.h" #include "utils/lsyscache.h" +/* + * for each bind parameter, pass this structure followed by value + * except for pass-by-value parameters. + */ +typedef struct SerializedParamExternData +{ + Datum value; /*pass-by-val are directly stored */ + Size length; /* length of parameter value */ + bool isnull; /* is it NULL? */ + uint16 pflags; /* flag bits, see above */ + Oid ptype; /* parameter's datatype, or 0 */ +} SerializedParamExternData; /* * Copy a ParamListInfo structure. @@ -73,3 +86,187 @@ copyParamList(ParamListInfo from) return retval; } + +/* + * Estimate the amount of space required to serialize the bound + * parameters. + */ +Size +EstimateBoundParametersSpace(ParamListInfo paramInfo) +{ + Size size; + int i; + + /* Add space required for saving numParams */ + size = sizeof(int); + + if (paramInfo) + { + /* Add space required for saving the param data */ + for (i = 0; i < paramInfo->numParams; i++) + { + /* + * for each parameter, calculate the size of fixed part + * of parameter (SerializedParamExternData) and length of + * parameter value. + */ + ParamExternData *oprm; + int16 typLen; + bool typByVal; + Size length; + + length = sizeof(SerializedParamExternData); + + oprm = ¶mInfo->params[i]; + + get_typlenbyval(oprm->ptype, &typLen, &typByVal); + + /* + * pass-by-value parameters are directly stored in + * SerializedParamExternData, so no need of additional + * space for them. + */ + if (!(typByVal || oprm->isnull)) + { + length += datumGetSize(oprm->value, typByVal, typLen); + size = add_size(size, length); + + /* Allow space for terminating zero-byte */ + size = add_size(size, 1); + } + else + size = add_size(size, length); + } + } + + return size; +} + +/* + * Serialize the bind parameters into the memory, beginning at start_address. + * maxsize should be at least as large as the value returned by + * EstimateBoundParametersSpace. + */ +void +SerializeBoundParams(ParamListInfo paramInfo, Size maxsize, char *start_address) +{ + char *curptr; + SerializedParamExternData *retval; + int i; + + /* + * First, we store the number of bind parameters, if there is + * no bind parameter then no need to store any more information. + */ + if (paramInfo && paramInfo->numParams > 0) + * (int *) start_address = paramInfo->numParams; + else + { + * (int *) start_address = 0; + return; + } + curptr = start_address + sizeof(int); + + + for (i = 0; i < paramInfo->numParams; i++) + { + ParamExternData *oprm; + int16 typLen; + bool typByVal; + Size datumlength, length; + const char *s; + + Assert (curptr <= start_address + maxsize); + retval = (SerializedParamExternData*) curptr; + oprm = ¶mInfo->params[i]; + + retval->isnull = oprm->isnull; + retval->pflags = oprm->pflags; + retval->ptype = oprm->ptype; + retval->value = oprm->value; + + curptr = curptr + sizeof(SerializedParamExternData); + + if (retval->isnull) + continue; + + get_typlenbyval(oprm->ptype, &typLen, &typByVal); + + if (!typByVal) + { + datumlength = datumGetSize(oprm->value, typByVal, typLen); + s = (char *) DatumGetPointer(oprm->value); + memcpy(curptr, s, datumlength); + length = datumlength; + curptr[length] = '\0'; + retval->length = length; + curptr += length + 1; + } + } +} + +/* + * RestoreBoundParams + * Restore bind parameters from the specified address. + * + * The params are palloc'd in CurrentMemoryContext. + */ +ParamListInfo +RestoreBoundParams(char *start_address) +{ + ParamListInfo retval; + Size size; + int num_params,i; + char *curptr; + + num_params = * (int *) start_address; + + if (num_params <= 0) + return NULL; + + /* sizeof(ParamListInfoData) includes the first array element */ + size = sizeof(ParamListInfoData) + + (num_params - 1) * sizeof(ParamExternData); + retval = (ParamListInfo) palloc(size); + retval->paramFetch = NULL; + retval->paramFetchArg = NULL; + retval->parserSetup = NULL; + retval->parserSetupArg = NULL; + retval->numParams = num_params; + + curptr = start_address + sizeof(int); + + for (i = 0; i < num_params; i++) + { + SerializedParamExternData *nprm; + char *s; + int16 typLen; + bool typByVal; + + nprm = (SerializedParamExternData *) curptr; + + /* copy the parameter info */ + retval->params[i].isnull = nprm->isnull; + retval->params[i].pflags = nprm->pflags; + retval->params[i].ptype = nprm->ptype; + retval->params[i].value = nprm->value; + + curptr = curptr + sizeof(SerializedParamExternData); + + if (nprm->isnull) + continue; + + get_typlenbyval(nprm->ptype, &typLen, &typByVal); + + if (!typByVal) + { + s = palloc(nprm->length + 1); + memcpy(s, curptr, nprm->length + 1); + retval->params[i].value = CStringGetDatum(s); + + curptr += nprm->length + 1; + } + } + + return retval; +} diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 563209c..2bae475 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1280,6 +1280,91 @@ _readRangeTblFunction(void) READ_DONE(); } +/* + * _readPlanInvalItem + */ +static PlanInvalItem * +_readPlanInvalItem(void) +{ + READ_LOCALS(PlanInvalItem); + + READ_INT_FIELD(cacheId); + READ_UINT_FIELD(hashValue); + + READ_DONE(); +} + +/* + * _readPlannedStmt + */ +static PlannedStmt * +_readPlannedStmt(void) +{ + READ_LOCALS(PlannedStmt); + + READ_ENUM_FIELD(commandType, CmdType); + READ_UINT_FIELD(queryId); + READ_BOOL_FIELD(hasReturning); + READ_BOOL_FIELD(hasModifyingCTE); + READ_BOOL_FIELD(canSetTag); + READ_BOOL_FIELD(transientPlan); + READ_NODE_FIELD(planTree); + READ_NODE_FIELD(rtable); + READ_NODE_FIELD(resultRelations); + READ_NODE_FIELD(utilityStmt); + READ_NODE_FIELD(subplans); + READ_BITMAPSET_FIELD(rewindPlanIDs); + READ_NODE_FIELD(rowMarks); + READ_NODE_FIELD(relationOids); + READ_NODE_FIELD(invalItems); + READ_INT_FIELD(nParamExec); + READ_BOOL_FIELD(hasRowSecurity); + + READ_DONE(); +} + +static Plan * +_readPlan(void) +{ + READ_LOCALS(Plan); + + READ_FLOAT_FIELD(startup_cost); + READ_FLOAT_FIELD(total_cost); + READ_FLOAT_FIELD(plan_rows); + READ_INT_FIELD(plan_width); + READ_NODE_FIELD(targetlist); + READ_NODE_FIELD(qual); + READ_NODE_FIELD(lefttree); + READ_NODE_FIELD(righttree); + READ_NODE_FIELD(initPlan); + READ_BITMAPSET_FIELD(extParam); + READ_BITMAPSET_FIELD(allParam); + + READ_DONE(); +} + +static Scan * +_readScan(void) +{ + Plan *local_plan; + READ_LOCALS(PartialSeqScan); + + local_plan = _readPlan(); + local_node->plan.startup_cost = local_plan->startup_cost; + local_node->plan.total_cost = local_plan->total_cost; + local_node->plan.plan_rows = local_plan->plan_rows; + local_node->plan.plan_width = local_plan->plan_width; + local_node->plan.targetlist = local_plan->targetlist; + local_node->plan.qual = local_plan->qual; + local_node->plan.lefttree = local_plan->lefttree; + local_node->plan.righttree = local_plan->righttree; + local_node->plan.initPlan = local_plan->initPlan; + local_node->plan.extParam = local_plan->extParam; + local_node->plan.allParam = local_plan->allParam; + READ_UINT_FIELD(scanrelid); + + READ_DONE(); +} /* * parseNodeString @@ -1409,6 +1494,12 @@ parseNodeString(void) return_value = _readNotifyStmt(); else if (MATCH("DECLARECURSOR", 13)) return_value = _readDeclareCursorStmt(); + else if (MATCH("PLANINVALITEM", 13)) + return_value = _readPlanInvalItem(); + else if (MATCH("PLANNEDSTMT", 11)) + return_value = _readPlannedStmt(); + else if (MATCH("PARTIALSEQSCAN", 14)) + return_value = _readScan(); else { elog(ERROR, "badly formatted node string \"%.32s\"...", token); diff --git a/src/backend/optimizer/path/Makefile b/src/backend/optimizer/path/Makefile index 6864a62..6e462b1 100644 --- a/src/backend/optimizer/path/Makefile +++ b/src/backend/optimizer/path/Makefile @@ -13,6 +13,6 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = allpaths.o clausesel.o costsize.o equivclass.o indxpath.o \ - joinpath.o joinrels.o pathkeys.o tidpath.o + joinpath.o joinrels.o pathkeys.o parallelpath.o tidpath.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 58d78e6..528727c 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -410,6 +410,9 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) /* Consider sequential scan */ add_path(rel, create_seqscan_path(root, rel, required_outer)); + /* Consider parallel scans */ + create_parallelscan_paths(root, rel); + /* Consider index scans */ create_index_paths(root, rel); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 78ef229..5f5980f 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -11,6 +11,9 @@ * cpu_tuple_cost Cost of typical CPU time to process a tuple * cpu_index_tuple_cost Cost of typical CPU time to process an index tuple * cpu_operator_cost Cost of CPU time to execute an operator or function + * cpu_tuple_comm_cost Cost of CPU time to pass a tuple from worker to master backend + * parallel_setup_cost Cost of setting up shared memory for parallelism + * parallel_startup_cost Cost of starting up parallel workers * * We expect that the kernel will typically do some amount of read-ahead * optimization; this in conjunction with seek costs means that seq_page_cost @@ -101,11 +104,16 @@ double random_page_cost = DEFAULT_RANDOM_PAGE_COST; double cpu_tuple_cost = DEFAULT_CPU_TUPLE_COST; double cpu_index_tuple_cost = DEFAULT_CPU_INDEX_TUPLE_COST; double cpu_operator_cost = DEFAULT_CPU_OPERATOR_COST; +double cpu_tuple_comm_cost = DEFAULT_CPU_TUPLE_COMM_COST; +double parallel_setup_cost = DEFAULT_PARALLEL_SETUP_COST; +double parallel_startup_cost = DEFAULT_PARALLEL_STARTUP_COST; int effective_cache_size = DEFAULT_EFFECTIVE_CACHE_SIZE; Cost disable_cost = 1.0e10; +int parallel_seqscan_degree = 0; + bool enable_seqscan = true; bool enable_indexscan = true; bool enable_indexonlyscan = true; @@ -219,6 +227,55 @@ cost_seqscan(Path *path, PlannerInfo *root, } /* + * cost_funnel + * Determines and returns the cost of scanning a relation parallely. + * + * 'baserel' is the relation to be scanned + * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + */ +void +cost_funnel(FunnelPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, + int nWorkers) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + + /* Should only be applied to base relations */ + Assert(baserel->relid > 0); + Assert(baserel->rtekind == RTE_RELATION); + + /* Mark the path with the correct row estimate */ + if (param_info) + path->path.rows = param_info->ppi_rows; + else + path->path.rows = baserel->rows; + + startup_cost = path->subpath->startup_cost; + + run_cost = path->subpath->total_cost - path->subpath->startup_cost; + + /* + * Runtime cost will be equally shared by all workers. + * Here assumption is that disk access cost will also be + * equally shared between workers which is generally true + * unless there are too many workers working on a relatively + * lesser number of blocks. If we come across any such case, + * then we can think of changing the current cost model for + * parallel sequiantial scan. + */ + run_cost = run_cost / (nWorkers + 1); + + /* Parallel setup and communication cost. */ + startup_cost += parallel_setup_cost; + startup_cost += parallel_startup_cost * nWorkers; + run_cost += cpu_tuple_comm_cost * baserel->tuples; + + path->path.startup_cost = startup_cost; + path->path.total_cost = (startup_cost + run_cost); +} + +/* * cost_index * Determines and returns the cost of scanning a relation using an index. * diff --git a/src/backend/optimizer/path/parallelpath.c b/src/backend/optimizer/path/parallelpath.c new file mode 100644 index 0000000..3149247 --- /dev/null +++ b/src/backend/optimizer/path/parallelpath.c @@ -0,0 +1,121 @@ +/*------------------------------------------------------------------------- + * + * parallelpath.c + * Routines to determine which conditions are usable for scanning + * a given relation, and create ParallelPaths accordingly. + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/optimizer/path/parallelpath.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heapam.h" +#include "nodes/relation.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/restrictinfo.h" +#include "optimizer/clauses.h" +#include "parser/parsetree.h" +#include "utils/rel.h" + + +/* + * check_simple_qual - + * Check if qual is made only of simple things we can + * hand out directly to backend worker for execution. + * + * XXX - Currently we don't allow to push an expression + * if it contains volatile function, however eventually we + * need a mechanism (proisparallel) with which we can distinquish + * the functions that can be pushed for execution by parallel + * worker. + */ +static bool +check_simple_qual(Node *node) +{ + if (node == NULL) + return TRUE; + + if (contain_volatile_functions(node)) + return FALSE; + + return TRUE; +} + +/* + * create_parallelscan_paths + * Create paths corresponding to parallel scans of the given rel. + * Currently we only support parallel sequential scan. + * + * Candidate paths are added to the rel's pathlist (using add_path). + */ +void +create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel) +{ + int num_parallel_workers = 0; + Oid reloid; + Relation relation; + Path *subpath; + + /* + * parallel scan is possible only if user has set + * parallel_seqscan_degree to value greater than 0. + */ + if (parallel_seqscan_degree <= 0) + return; + + /* + * parallel scan is not supported for joins. + */ + if (root->simple_rel_array_size > 2) + return; + + /* parallel scan is supportted only for Select statements. */ + if (root->parse->commandType != CMD_SELECT) + return; + + reloid = planner_rt_fetch(rel->relid, root)->relid; + + relation = heap_open(reloid, NoLock); + + /* + * Temporary relations can't be scanned by parallel workers as + * they are visible only to local sessions. + */ + if (RelationUsesLocalBuffers(relation)) + { + heap_close(relation, NoLock); + return; + } + + heap_close(relation, NoLock); + + /* + * parallel scan is not supported for mutable functions + */ + if (!check_simple_qual((Node*) extract_actual_clauses(rel->baserestrictinfo, false))) + return; + + /* + * There should be atleast one page to scan for each worker. + */ + if (parallel_seqscan_degree <= rel->pages) + num_parallel_workers = parallel_seqscan_degree; + else + num_parallel_workers = rel->pages; + + /* Create the partial scan path which each worker needs to execute. */ + subpath = create_partialseqscan_path(root, rel, false); + + /* Create the parallel scan path which master needs to execute. */ + add_path(rel, (Path *) create_funnel_path(root, rel, subpath, + num_parallel_workers)); +} diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 76ba1bf..744e652 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -58,6 +58,11 @@ static Material *create_material_plan(PlannerInfo *root, MaterialPath *best_path static Plan *create_unique_plan(PlannerInfo *root, UniquePath *best_path); static SeqScan *create_seqscan_plan(PlannerInfo *root, Path *best_path, List *tlist, List *scan_clauses); +static Scan *create_partialseqscan_plan(PlannerInfo *root, Path *best_path, + List *tlist, List *scan_clauses); +static Scan *create_funnel_plan(PlannerInfo *root, + FunnelPath *best_path, + List *tlist, List *scan_clauses); static Scan *create_indexscan_plan(PlannerInfo *root, IndexPath *best_path, List *tlist, List *scan_clauses, bool indexonly); static BitmapHeapScan *create_bitmap_scan_plan(PlannerInfo *root, @@ -100,6 +105,12 @@ static List *order_qual_clauses(PlannerInfo *root, List *clauses); static void copy_path_costsize(Plan *dest, Path *src); static void copy_plan_costsize(Plan *dest, Plan *src); static SeqScan *make_seqscan(List *qptlist, List *qpqual, Index scanrelid); +static PartialSeqScan *make_partialseqscan(List *qptlist, + List *qpqual, + Index scanrelid); +static Funnel *make_funnel(List *qptlist, List *qpqual, + Index scanrelid, int nworkers, + Plan *subplan); static IndexScan *make_indexscan(List *qptlist, List *qpqual, Index scanrelid, Oid indexid, List *indexqual, List *indexqualorig, List *indexorderby, List *indexorderbyorig, @@ -228,6 +239,8 @@ create_plan_recurse(PlannerInfo *root, Path *best_path) switch (best_path->pathtype) { case T_SeqScan: + case T_PartialSeqScan: + case T_Funnel: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -343,6 +356,20 @@ create_scan_plan(PlannerInfo *root, Path *best_path) scan_clauses); break; + case T_PartialSeqScan: + plan = (Plan *) create_partialseqscan_plan(root, + best_path, + tlist, + scan_clauses); + break; + + case T_Funnel: + plan = (Plan *) create_funnel_plan(root, + (FunnelPath *) best_path, + tlist, + scan_clauses); + break; + case T_IndexScan: plan = (Plan *) create_indexscan_plan(root, (IndexPath *) best_path, @@ -546,6 +573,8 @@ disuse_physical_tlist(PlannerInfo *root, Plan *plan, Path *path) switch (path->pathtype) { case T_SeqScan: + case T_Funnel: + case T_PartialSeqScan: case T_IndexScan: case T_IndexOnlyScan: case T_BitmapHeapScan: @@ -1133,6 +1162,87 @@ create_seqscan_plan(PlannerInfo *root, Path *best_path, } /* + * create_partialseqscan_plan + * + * Returns a partial seqscan plan for the base relation scanned by + * 'best_path' with restriction clauses 'scan_clauses' and targetlist + * 'tlist'. + */ +static Scan * +create_partialseqscan_plan(PlannerInfo *root, Path *best_path, + List *tlist, List *scan_clauses) +{ + Scan *scan_plan; + Index scan_relid = best_path->parent->relid; + + /* it should be a base rel... */ + Assert(scan_relid > 0); + Assert(best_path->path.parent->rtekind == RTE_RELATION); + + /* Sort clauses into best execution order */ + scan_clauses = order_qual_clauses(root, scan_clauses); + + /* Reduce RestrictInfo list to bare expressions; ignore pseudoconstants */ + scan_clauses = extract_actual_clauses(scan_clauses, false); + + /* Replace any outer-relation variables with nestloop params */ + if (best_path->param_info) + { + scan_clauses = (List *) + replace_nestloop_params(root, (Node *) scan_clauses); + } + + scan_plan = (Scan *) make_partialseqscan(tlist, + scan_clauses, + scan_relid); + + copy_path_costsize(&scan_plan->plan, best_path); + + return scan_plan; +} + +/* + * create_funnel_plan + * + * Returns a funnel plan for the base relation scanned by + * 'best_path' with restriction clauses 'scan_clauses' and targetlist + * 'tlist'. + */ +static Scan * +create_funnel_plan(PlannerInfo *root, FunnelPath *best_path, + List *tlist, List *scan_clauses) +{ + Scan *scan_plan; + Plan *subplan; + Index scan_relid = best_path->path.parent->relid; + + /* it should be a base rel... */ + Assert(scan_relid > 0); + Assert(best_path->path.parent->rtekind == RTE_RELATION); + + subplan = create_plan_recurse(root, best_path->subpath); + + /* + * quals for subplan and top level plan are same + * as either all the quals are pushed to subplan + * (partialseqscan plan) or parallel plan won't be + * choosen. + */ + scan_plan = (Scan *) make_funnel(tlist, + subplan->qual, + scan_relid, + best_path->num_workers, + subplan); + + copy_path_costsize(&scan_plan->plan, &best_path->path); + + /* use parallel mode for parallel plans. */ + root->glob->parallelModeNeeded = true; + + return scan_plan; +} + +/* * create_indexscan_plan * Returns an indexscan plan for the base relation scanned by 'best_path' * with restriction clauses 'scan_clauses' and targetlist 'tlist'. @@ -3318,6 +3428,45 @@ make_seqscan(List *qptlist, return node; } +static PartialSeqScan * +make_partialseqscan(List *qptlist, + List *qpqual, + Index scanrelid) +{ + PartialSeqScan *node = makeNode(PartialSeqScan); + Plan *plan = &node->plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = NULL; + plan->righttree = NULL; + node->scanrelid = scanrelid; + + return node; +} + +static Funnel * +make_funnel(List *qptlist, + List *qpqual, + Index scanrelid, + int nworkers, + Plan *subplan) +{ + Funnel *node = makeNode(Funnel); + Plan *plan = &node->scan.plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = subplan; + plan->righttree = NULL; + node->scan.scanrelid = scanrelid; + node->num_workers = nworkers; + + return node; +} + static IndexScan * make_indexscan(List *qptlist, List *qpqual, diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index b02a107..182c70d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -260,6 +260,50 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) return result; } +PlannedStmt * +create_worker_scan_plannedstmt(PartialSeqScan *partialscan, List *rangetable) +{ + PlannedStmt *result; + ListCell *tlist; + + /* + * Avoid removing junk entries in worker as those are + * required by upper nodes in master backend. + */ + foreach(tlist, partialscan->plan.targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(tlist); + + tle->resjunk = false; + } + + /* build the PlannedStmt result */ + result = makeNode(PlannedStmt); + + result->commandType = CMD_SELECT; + result->queryId = 0; + result->hasReturning = 0; + result->hasModifyingCTE = 0; + result->canSetTag = 1; + result->transientPlan = 0; + result->planTree = (Plan*) partialscan; + result->rtable = rangetable; + result->resultRelations = NIL; + result->utilityStmt = NULL; + result->subplans = NIL; + result->rewindPlanIDs = NULL; + result->rowMarks = NIL; + result->nParamExec = 0; + /* + * Don't bother to set parameters used for invalidation as + * worker backend plans are not saved, so can't be invalidated. + */ + result->relationOids = NIL; + result->invalItems = NIL; + result->hasRowSecurity = false; + + return result; +} /*-------------------- * subquery_planner diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index ec828cd..1b63f23 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -435,6 +435,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) switch (nodeTag(plan)) { case T_SeqScan: + case T_PartialSeqScan: { SeqScan *splan = (SeqScan *) plan; @@ -445,6 +446,24 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) fix_scan_list(root, splan->plan.qual, rtoffset); } break; + case T_Funnel: + { + Funnel *splan = (Funnel *) plan; + + splan->scan.scanrelid += rtoffset; + splan->scan.plan.targetlist = + fix_scan_list(root, splan->scan.plan.targetlist, rtoffset); + splan->scan.plan.qual = + fix_scan_list(root, splan->scan.plan.qual, rtoffset); + + /* + * target list for partial sequence scan (leftree of funnel scan) + * should be same as for funnel scan as both nodes need to produce + * same projection. + */ + splan->scan.plan.lefttree->targetlist = splan->scan.plan.targetlist; + } + break; case T_IndexScan: { IndexScan *splan = (IndexScan *) plan; diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 5a1d539..8ea91ec 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2163,6 +2163,8 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, break; case T_SeqScan: + case T_PartialSeqScan: + case T_Funnel: context.paramids = bms_add_members(context.paramids, scan_params); break; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 1395a21..c1ffe78 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -706,6 +706,53 @@ create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) } /* + * create_partialseqscan_path + * Creates a path corresponding to a partial sequential scan, returning the + * pathnode. + */ +Path * +create_partialseqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer) +{ + Path *pathnode = makeNode(Path); + + pathnode->pathtype = T_PartialSeqScan; + pathnode->parent = rel; + pathnode->param_info = get_baserel_parampathinfo(root, rel, + false); + pathnode->pathkeys = NIL; /* seqscan has unordered result */ + + cost_seqscan(pathnode, root, rel, pathnode->param_info); + + return pathnode; +} + +/* + * create_funnel_path + * + * Creates a path corresponding to a funnel scan, returning the + * pathnode. + */ +FunnelPath * +create_funnel_path(PlannerInfo *root, RelOptInfo *rel, + Path* subpath, int nWorkers) +{ + FunnelPath *pathnode = makeNode(FunnelPath); + + pathnode->path.pathtype = T_Funnel; + pathnode->path.parent = rel; + pathnode->path.param_info = get_baserel_parampathinfo(root, rel, + false); + pathnode->path.pathkeys = NIL; /* seqscan has unordered result */ + + pathnode->subpath = subpath; + pathnode->num_workers = nWorkers; + + cost_funnel(pathnode, root, rel, pathnode->path.param_info, nWorkers); + + return pathnode; +} + +/* * create_index_path * Creates a path node for an index scan. * diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c2321..f056bd5 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,8 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o +OBJS = autovacuum.o backendworker.o bgworker.o bgwriter.o checkpointer.o \ + fork_process.o pgarch.o pgstat.o postmaster.o startup.o syslogger.o \ + walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/backendworker.c b/src/backend/postmaster/backendworker.c new file mode 100644 index 0000000..0c38e60 --- /dev/null +++ b/src/backend/postmaster/backendworker.c @@ -0,0 +1,410 @@ +/*------------------------------------------------------------------------- + * + * backendworker.c + * Support routines for setting up backend workers. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/postmaster/backendworker.c + * + *------------------------------------------------------------------------- + */ +/* + * INTERFACE ROUTINES + * InitializeParallelWorkers Setup dynamic shared memory and parallel backend workers. + */ +#include "postgres.h" + +#include "access/xact.h" +#include "access/parallel.h" +#include "commands/dbcommands.h" +#include "commands/async.h" +#include "executor/nodeFunnel.h" +#include "miscadmin.h" +#include "nodes/parsenodes.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" +#include "postmaster/backendworker.h" +#include "storage/ipc.h" +#include "storage/procsignal.h" +#include "storage/procarray.h" +#include "storage/shm_toc.h" +#include "storage/spin.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + + +#define PARALLEL_TUPLE_QUEUE_SIZE 65536 + +static void ParallelQueryMain(dsm_segment *seg, shm_toc *toc); +static void +EstimateParallelSupportInfoSpace(ParallelContext *pcxt, ParamListInfo params, + int instOptions, Size *params_size); +static void +StoreParallelSupportInfo(ParallelContext *pcxt, ParamListInfo params, + int instOptions, int params_size, + char **inst_options_space); +static void +EstimatePartialSeqScanSpace(ParallelContext *pcxt, EState *estate, + char *plannedstmt_str, Size *plannedstmt_len, + Size *pscan_size); +static void +StorePartialSeqScan(ParallelContext *pcxt, EState *estate, Relation rel, + char *plannedstmt_str, ParallelHeapScanDesc *pscan, + Size plannedstmt_size, Size pscan_size); +static void EstimateResponseQueueSpace(ParallelContext *pcxt); +static void +StoreResponseQueue(ParallelContext *pcxt, + shm_mq_handle ***responseqp); +static void +GetPlannedStmt(shm_toc *toc, PlannedStmt **plannedstmt); +static void +GetParallelSupportInfo(shm_toc *toc, ParamListInfo *params, + int *inst_options, char **instrument); +static void +SetupResponseQueue(dsm_segment *seg, shm_toc *toc, shm_mq **mq, + shm_mq_handle **responseq); + + +/* + * EstimateParallelSupportInfoSpace + * + * Estimate the amount of space required to record information of + * bind parameters and instrumentation information that need to be + * retrieved from parallel workers. + */ +void +EstimateParallelSupportInfoSpace(ParallelContext *pcxt, ParamListInfo params, + int instOptions, Size *params_size) +{ + *params_size = EstimateBoundParametersSpace(params); + shm_toc_estimate_chunk(&pcxt->estimator, *params_size); + + /* account for instrumentation options. */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(int)); + + /* + * We expect each worker to populate the instrumentation structure + * allocated by master backend and then master backend will aggregate + * all the information, so account it for each worker. + */ + if (instOptions) + { + shm_toc_estimate_chunk(&pcxt->estimator, + sizeof(Instrumentation) * pcxt->nworkers); + /* keys for parallel support information. */ + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + + /* keys for parallel support information. */ + shm_toc_estimate_keys(&pcxt->estimator, 2); +} + +/* + * StoreParallelSupportInfo + * + * Sets up the bind parameters and instrumentation information + * required for parallel execution. + */ +void +StoreParallelSupportInfo(ParallelContext *pcxt, ParamListInfo params, + int instOptions, int params_size, + char **inst_options_space) +{ + char *paramsdata; + int *inst_options; + + /* + * Store bind parameter's list in dynamic shared memory. This is + * used for parameters in prepared query. + */ + paramsdata = shm_toc_allocate(pcxt->toc, params_size); + SerializeBoundParams(params, params_size, paramsdata); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMS, paramsdata); + + /* Store instrument options in dynamic shared memory. */ + inst_options = shm_toc_allocate(pcxt->toc, sizeof(int)); + *inst_options = instOptions; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INST_OPTIONS, inst_options); + + /* + * Allocate space for instrumentation information to be filled by + * each worker. + */ + if (instOptions) + { + *inst_options_space = + shm_toc_allocate(pcxt->toc, sizeof(Instrumentation) * pcxt->nworkers); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INST_INFO, *inst_options_space); + } +} + +/* + * EstimatePartialSeqScanSpace + * + * Estimate the amount of space required to record information of + * planned statement and parallel heap scan descriptor that need + * to be copied to parallel workers. + */ +void +EstimatePartialSeqScanSpace(ParallelContext *pcxt, EState *estate, + char *plannedstmt_str, Size *plannedstmt_len, + Size *pscan_size) +{ + /* Estimate space for partial seq. scan specific contents. */ + *plannedstmt_len = strlen(plannedstmt_str) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, *plannedstmt_len); + + *pscan_size = heap_parallelscan_estimate(estate->es_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, *pscan_size); + + /* keys for parallel support information. */ + shm_toc_estimate_keys(&pcxt->estimator, 2); +} + +/* + * StorePartialSeqScan + * + * Sets up the planned statement and block range for parallel + * sequence scan. + */ +void +StorePartialSeqScan(ParallelContext *pcxt, EState *estate, Relation rel, + char *plannedstmt_str, ParallelHeapScanDesc *pscan, + Size plannedstmt_size, Size pscan_size) +{ + char *plannedstmtdata; + + /* Store range table list in dynamic shared memory. */ + plannedstmtdata = shm_toc_allocate(pcxt->toc, plannedstmt_size); + memcpy(plannedstmtdata, plannedstmt_str, plannedstmt_size); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, plannedstmtdata); + + /* Store parallel heap scan descriptor in dynamic shared memory. */ + *pscan = shm_toc_allocate(pcxt->toc, pscan_size); + heap_parallelscan_initialize(*pscan, rel, estate->es_snapshot); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SCAN, *pscan); +} + +/* + * EstimateResponseQueueSpace + * + * Estimate the amount of space required to record information of + * tuple queues that need to be established between parallel workers + * and master backend. + */ +void +EstimateResponseQueueSpace(ParallelContext *pcxt) +{ + /* Estimate space for parallel seq. scan specific contents. */ + shm_toc_estimate_chunk(&pcxt->estimator, + (Size) PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + + /* keys for response queue. */ + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* + * StoreResponseQueue + * + * It sets up the response queue's for backend worker's to + * return tuples to the main backend and start the workers. + */ +void +StoreResponseQueue(ParallelContext *pcxt, + shm_mq_handle ***responseqp) +{ + shm_mq *mq; + char *tuple_queue_space; + int i; + + /* Allocate memory for shared memory queue handles. */ + *responseqp = (shm_mq_handle**) palloc(pcxt->nworkers * sizeof(shm_mq_handle*)); + + /* + * Establish one message queue per worker in dynamic shared memory. + * These queues should be used to transmit tuple data. + */ + tuple_queue_space = + shm_toc_allocate(pcxt->toc, PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); + for (i = 0; i < pcxt->nworkers; ++i) + { + mq = shm_mq_create(tuple_queue_space + i * PARALLEL_TUPLE_QUEUE_SIZE, + (Size) PARALLEL_TUPLE_QUEUE_SIZE); + + shm_mq_set_receiver(mq, MyProc); + + /* + * Attach the queue before launching a worker, so that we'll automatically + * detach the queue if we error out. (Otherwise, the worker might sit + * there trying to write the queue long after we've gone away.) + */ + (*responseqp)[i] = shm_mq_attach(mq, pcxt->seg, NULL); + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tuple_queue_space); +} + +/* + * InitializeParallelWorkers + * + * Sets up the required infrastructure for backend workers to + * perform execution and return results to the main backend. + */ +void +InitializeParallelWorkers(Plan *plan, EState *estate, Relation rel, + char **inst_options_space, + shm_mq_handle ***responseqp, ParallelContext **pcxtp, + ParallelHeapScanDesc *pscan, int nWorkers) +{ + bool already_in_parallel_mode = IsInParallelMode(); + Size params_size, pscan_size, plannedstmt_size; + char *plannedstmt_str; + PlannedStmt *plannedstmt; + ParallelContext *pcxt; + + if (!already_in_parallel_mode) + EnterParallelMode(); + + pcxt = CreateParallelContext(ParallelQueryMain, nWorkers); + + plannedstmt = create_worker_scan_plannedstmt((PartialSeqScan *)plan, + estate->es_range_table); + plannedstmt_str = nodeToString(plannedstmt); + + EstimatePartialSeqScanSpace(pcxt, estate, plannedstmt_str, + &plannedstmt_size, &pscan_size); + EstimateParallelSupportInfoSpace(pcxt, estate->es_param_list_info, + estate->es_instrument, ¶ms_size); + EstimateResponseQueueSpace(pcxt); + + InitializeParallelDSM(pcxt); + + StorePartialSeqScan(pcxt, estate, rel, plannedstmt_str, + pscan, plannedstmt_size, pscan_size); + + StoreParallelSupportInfo(pcxt, estate->es_param_list_info, + estate->es_instrument, + params_size, inst_options_space); + StoreResponseQueue(pcxt, responseqp); + + /* Return results to caller. */ + *pcxtp = pcxt; +} + +/* + * GetParallelSupportInfo + * + * Look up based on keys in dynamic shared memory segment + * and get the bind parameter's and instrumentation information + * required to perform parallel operation. + */ +void +GetParallelSupportInfo(shm_toc *toc, ParamListInfo *params, + int *inst_options, char **instrument) +{ + char *paramsdata; + char *inst_options_space; + int *instoptions; + + paramsdata = shm_toc_lookup(toc, PARALLEL_KEY_PARAMS); + instoptions = shm_toc_lookup(toc, PARALLEL_KEY_INST_OPTIONS); + + *params = RestoreBoundParams(paramsdata); + + *inst_options = *instoptions; + if (inst_options) + { + inst_options_space = shm_toc_lookup(toc, PARALLEL_KEY_INST_INFO); + *instrument = (inst_options_space + + ParallelWorkerNumber * sizeof(Instrumentation)); + } +} + +/* + * GetPlannedStmt + * + * Look up based on keys in dynamic shared memory segment + * and get the planned statement required to perform + * parallel operation. + */ +void +GetPlannedStmt(shm_toc *toc, PlannedStmt **plannedstmt) +{ + char *plannedstmtdata; + + plannedstmtdata = shm_toc_lookup(toc, PARALLEL_KEY_PLANNEDSTMT); + + *plannedstmt = (PlannedStmt *) stringToNode(plannedstmtdata); + + /* Fill in opfuncid values if missing */ + fix_opfuncids((Node*) (*plannedstmt)->planTree->qual); + fix_opfuncids((Node*) (*plannedstmt)->planTree->targetlist); +} + +/* + * SetupResponseQueue + * + * Look up based on keys in dynamic shared memory segment + * and get the tuple queue information for a particular worker, + * attach to the queue and redirect all futher responses from + * worker backend via that queue. + */ +void +SetupResponseQueue(dsm_segment *seg, shm_toc *toc, shm_mq **mq, + shm_mq_handle **responseq) +{ + char *tuple_queue_space; + + tuple_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_TUPLE_QUEUE); + *mq = (shm_mq *) (tuple_queue_space + + ParallelWorkerNumber * PARALLEL_TUPLE_QUEUE_SIZE); + + shm_mq_set_sender(*mq, MyProc); + *responseq = shm_mq_attach(*mq, seg, NULL); +} + +/* + * ParallelQueryMain + * + * Execute the operation to return the tuples or other information + * to parallelism driving node. + */ +void +ParallelQueryMain(dsm_segment *seg, shm_toc *toc) +{ + shm_mq *mq; + shm_mq_handle *responseq; + PlannedStmt *plannedstmt; + ParamListInfo params; + int inst_options; + char *instrument = NULL; + ParallelStmt *parallelstmt; + + SetupResponseQueue(seg, toc, &mq, &responseq); + + GetPlannedStmt(toc, &plannedstmt); + GetParallelSupportInfo(toc, ¶ms, &inst_options, &instrument); + + parallelstmt = palloc(sizeof(ParallelStmt)); + + parallelstmt->plannedstmt = plannedstmt; + parallelstmt->params = params; + parallelstmt->inst_options = inst_options; + parallelstmt->instrument = instrument; + parallelstmt->toc = toc; + parallelstmt->responseq = responseq; + + /* Execute the worker command. */ + exec_parallel_stmt(parallelstmt); + + /* + * Once we are done with sending tuples, detach from + * shared memory message queue used to send tuples. + */ + shm_mq_detach(mq); +} diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index ac431e5..4c303dd 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -103,6 +103,7 @@ #include "miscadmin.h" #include "pg_getopt.h" #include "pgstat.h" +#include "optimizer/cost.h" #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" #include "postmaster/fork_process.h" @@ -835,6 +836,12 @@ PostmasterMain(int argc, char *argv[]) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"archive\", \"hot_standby\", or \"logical\""))); + if (parallel_seqscan_degree >= MaxConnections) + { + write_stderr("%s: parallel_scan_degree must be less than max_connections\n", progname); + ExitPostmaster(1); + } + /* * Other one-time internal sanity checks can go here, if they are fast. * (Put any slow processing further down, after postmaster.pid creation.) diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index bcf3895..7a9ce3e 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -34,6 +34,7 @@ #include "commands/createas.h" #include "commands/matview.h" #include "executor/functions.h" +#include "executor/tqueue.h" #include "executor/tstoreReceiver.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -129,6 +130,9 @@ CreateDestReceiver(CommandDest dest) case DestTransientRel: return CreateTransientRelDestReceiver(InvalidOid); + + case DestTupleQueue: + return CreateTupleQueueDestReceiver(); } /* should never get here */ @@ -162,6 +166,7 @@ EndCommand(const char *commandTag, CommandDest dest) case DestCopyOut: case DestSQLFunction: case DestTransientRel: + case DestTupleQueue: break; } } @@ -204,6 +209,7 @@ NullCommand(CommandDest dest) case DestCopyOut: case DestSQLFunction: case DestTransientRel: + case DestTupleQueue: break; } } @@ -248,6 +254,7 @@ ReadyForQuery(CommandDest dest) case DestCopyOut: case DestSQLFunction: case DestTransientRel: + case DestTupleQueue: break; } } diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index ea2a432..17f322f 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -42,6 +42,7 @@ #include "catalog/pg_type.h" #include "commands/async.h" #include "commands/prepare.h" +#include "executor/tqueue.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqsignal.h" @@ -55,6 +56,7 @@ #include "pg_getopt.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "postmaster/backendworker.h" #include "replication/slot.h" #include "replication/walsender.h" #include "rewrite/rewriteHandler.h" @@ -1191,6 +1193,80 @@ exec_simple_query(const char *query_string) } /* + * exec_parallel_stmt + * + * Execute the plan for backend worker. + */ +void +exec_parallel_stmt(ParallelStmt *parallelstmt) +{ + DestReceiver *receiver; + QueryDesc *queryDesc; + MemoryContext oldcontext; + MemoryContext plancontext; + + set_ps_display("SELECT", false); + + /* + * Unlike exec_simple_query(), in backend worker we won't allow + * transaction control statements, so we can allow plancontext + * to be created in TopTransaction context. + */ + plancontext = AllocSetContextCreate(CurrentMemoryContext, + "worker plan", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(plancontext); + + if (parallelstmt->inst_options) + receiver = None_Receiver; + else + { + receiver = CreateDestReceiver(DestTupleQueue); + SetTupleQueueDestReceiverParams(receiver, parallelstmt->responseq); + } + + /* Create a QueryDesc for the query */ + queryDesc = CreateQueryDesc(parallelstmt->plannedstmt, "", + GetActiveSnapshot(), InvalidSnapshot, + receiver, parallelstmt->params, + parallelstmt->inst_options); + + queryDesc->toc = parallelstmt->toc; + + PushActiveSnapshot(queryDesc->snapshot); + + /* call ExecutorStart to prepare the plan for execution */ + ExecutorStart(queryDesc, 0); + + /* run the plan */ + ExecutorRun(queryDesc, ForwardScanDirection, 0L); + + /* run cleanup too */ + ExecutorFinish(queryDesc); + + /* + * copy intrumentation information into shared memory if requested + * by master backend. + */ + if (parallelstmt->inst_options) + memcpy(parallelstmt->instrument, + queryDesc->planstate->instrument, + sizeof(Instrumentation)); + + ExecutorEnd(queryDesc); + + PopActiveSnapshot(); + + FreeQueryDesc(queryDesc); + + if (!parallelstmt->inst_options) + (*receiver->rDestroy) (receiver); +} + +/* * exec_parse_message * * Execute a "Parse" protocol message. diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 9c14e8a..0bbc67b 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -80,6 +80,7 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->params = params; /* parameter values passed into query */ qd->instrument_options = instrument_options; /* instrumentation * wanted? */ + qd->toc = NULL; /* need to be set by the caller before ExecutorStart */ /* null these fields until set by ExecutorStart */ qd->tupDesc = NULL; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 791543e..abc2b8f 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -608,6 +608,8 @@ const char *const config_group_names[] = gettext_noop("Statistics / Query and Index Statistics Collector"), /* AUTOVACUUM */ gettext_noop("Autovacuum"), + /* PARALLEL_QUERY */ + gettext_noop("parallel_seqscan_degree"), /* CLIENT_CONN */ gettext_noop("Client Connection Defaults"), /* CLIENT_CONN_STATEMENT */ @@ -2537,6 +2539,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"parallel_seqscan_degree", PGC_SUSET, PARALLEL_QUERY, + gettext_noop("Sets the maximum number of simultaneously running backend worker processes."), + NULL + }, + ¶llel_seqscan_degree, + 0, 0, MAX_BACKENDS, + NULL, NULL, NULL + }, + + { {"autovacuum_work_mem", PGC_SIGHUP, RESOURCES_MEM, gettext_noop("Sets the maximum memory to be used by each autovacuum worker process."), NULL, @@ -2724,6 +2736,36 @@ static struct config_real ConfigureNamesReal[] = DEFAULT_CPU_OPERATOR_COST, 0, DBL_MAX, NULL, NULL, NULL }, + { + {"cpu_tuple_comm_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "passing each tuple (row) from worker to master backend."), + NULL + }, + &cpu_tuple_comm_cost, + DEFAULT_CPU_TUPLE_COMM_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, + { + {"parallel_setup_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "setting up environment (shared memory) for parallelism."), + NULL + }, + ¶llel_setup_cost, + DEFAULT_PARALLEL_SETUP_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, + { + {"parallel_startup_cost", PGC_USERSET, QUERY_TUNING_COST, + gettext_noop("Sets the planner's estimate of the cost of " + "starting parallel workers."), + NULL + }, + ¶llel_startup_cost, + DEFAULT_PARALLEL_STARTUP_COST, 0, DBL_MAX, + NULL, NULL, NULL + }, { {"cursor_tuple_fraction", PGC_USERSET, QUERY_TUNING_OTHER, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index f8f9ce1..fbe6042 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -290,6 +290,9 @@ #cpu_tuple_cost = 0.01 # same scale as above #cpu_index_tuple_cost = 0.005 # same scale as above #cpu_operator_cost = 0.0025 # same scale as above +#cpu_tuple_comm_cost = 0.1 # same scale as above +#parallel_setup_cost = 0.0 # same scale as above +#parallel_startup_cost = 0.0 # same scale as above #effective_cache_size = 4GB # - Genetic Query Optimizer - @@ -500,6 +503,11 @@ # autovacuum, -1 means use # vacuum_cost_limit +#------------------------------------------------------------------------------ +# PARALLEL_QUERY PARAMETERS +#------------------------------------------------------------------------------ + +#parallel_seqscan_degree = 0 # max number of worker backend subprocesses #------------------------------------------------------------------------------ # CLIENT CONNECTION DEFAULTS diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 0685e64..9d3d5e5 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -47,6 +47,8 @@ typedef struct ParallelContext extern bool ParallelMessagePending; extern int ParallelWorkerNumber; +extern int ParallelWorkerNumber; + extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers); extern void InitializeParallelDSM(ParallelContext *); diff --git a/src/include/access/shmmqam.h b/src/include/access/shmmqam.h new file mode 100644 index 0000000..80d06ac --- /dev/null +++ b/src/include/access/shmmqam.h @@ -0,0 +1,36 @@ +/*------------------------------------------------------------------------- + * + * shmmqam.h + * POSTGRES shared memory queue access method definitions. + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/shmmqam.h + * + *------------------------------------------------------------------------- + */ +#ifndef SHMMQAM_H +#define SHMMQAM_H + +#include "access/relscan.h" +#include "executor/tqueue.h" +#include "libpq/pqmq.h" + + +/* Private state maintained across calls to shm_getnext. */ +typedef struct worker_result_state +{ + bool all_workers_done; + bool local_scan_done; +} worker_result_state; + +typedef struct worker_result_state *worker_result; + +extern worker_result ExecInitWorkerResult(void); +extern HeapTuple shm_getnext(HeapScanDesc scanDesc, worker_result resultState, + TupleQueueFunnel *funnel, ScanDirection direction, + bool *fromheap); + +#endif /* SHMMQAM_H */ diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index a2381cd..56b7c75 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -42,6 +42,7 @@ typedef struct QueryDesc DestReceiver *dest; /* the destination for tuple output */ ParamListInfo params; /* param values being passed in */ int instrument_options; /* OR of InstrumentOption flags */ + shm_toc *toc; /* to fetch the information from dsm */ /* These fields are set by ExecutorStart */ TupleDesc tupDesc; /* descriptor for result tuples */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index 1c3b2b0..e8522fe 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -69,5 +69,6 @@ extern Instrumentation *InstrAlloc(int n, int instrument_options); extern void InstrStartNode(Instrumentation *instr); extern void InstrStopNode(Instrumentation *instr, double nTuples); extern void InstrEndLoop(Instrumentation *instr); +extern void InstrAggNode(Instrumentation *instr1, Instrumentation *instr2); #endif /* INSTRUMENT_H */ diff --git a/src/include/executor/nodeFunnel.h b/src/include/executor/nodeFunnel.h new file mode 100644 index 0000000..df7e11e --- /dev/null +++ b/src/include/executor/nodeFunnel.h @@ -0,0 +1,24 @@ +/*------------------------------------------------------------------------- + * + * nodefunnel.h + * + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeFunnel.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEFUNNEL_H +#define NODEFUNNEL_H + +#include "nodes/execnodes.h" + +extern FunnelState *ExecInitFunnel(Funnel *node, EState *estate, int eflags); +extern TupleTableSlot *ExecFunnel(FunnelState *node); +extern void ExecEndFunnel(FunnelState *node); + + +#endif /* NODEFUNNEL_H */ diff --git a/src/include/executor/nodePartialSeqscan.h b/src/include/executor/nodePartialSeqscan.h new file mode 100644 index 0000000..f02bcca --- /dev/null +++ b/src/include/executor/nodePartialSeqscan.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * nodePartialSeqscan.h + * + * + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodePartialSeqscan.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEPARTIALSEQSCAN_H +#define NODEPARTIALSEQSCAN_H + +#include "nodes/execnodes.h" + +extern PartialSeqScanState *ExecInitPartialSeqScan(PartialSeqScan *node, EState *estate, int eflags); +extern TupleTableSlot *ExecPartialSeqScan(PartialSeqScanState *node); +extern void ExecEndPartialSeqScan(PartialSeqScanState *node); + +#endif /* NODEPARTIALSEQSCAN_H */ diff --git a/src/include/executor/tqueue.h b/src/include/executor/tqueue.h new file mode 100644 index 0000000..c979233 --- /dev/null +++ b/src/include/executor/tqueue.h @@ -0,0 +1,34 @@ +/*------------------------------------------------------------------------- + * + * tqueue.h + * Use shm_mq to send & receive tuples between parallel backends + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/tqueue.h + * + *------------------------------------------------------------------------- + */ + +#ifndef TQUEUE_H +#define TQUEUE_H + +#include "storage/shm_mq.h" +#include "tcop/dest.h" + +/* Use this to send tuples to a shm_mq. */ +extern DestReceiver *CreateTupleQueueDestReceiver(void); +extern void SetTupleQueueDestReceiverParams(DestReceiver *self, + shm_mq_handle *handle); + +/* Use these to receive tuples from a shm_mq. */ +typedef struct TupleQueueFunnel TupleQueueFunnel; +extern TupleQueueFunnel *CreateTupleQueueFunnel(void); +extern void DestroyTupleQueueFunnel(TupleQueueFunnel *funnel); +extern void RegisterTupleQueueOnFunnel(TupleQueueFunnel *, shm_mq_handle *); +extern HeapTuple TupleQueueFunnelNext(TupleQueueFunnel *, bool nowait, + bool *done); + +#endif /* TQUEUE_H */ diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index 48f84bf..e5dec1e 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -127,6 +127,8 @@ typedef struct TupleTableSlot MinimalTuple tts_mintuple; /* minimal tuple, or NULL if none */ HeapTupleData tts_minhdr; /* workspace for minimal-tuple-only case */ long tts_off; /* saved state for slot_deform_tuple */ + bool tts_fromheap; /* indicates whether the tuple is fetched from + heap or shrared memory message queue */ } TupleTableSlot; #define TTS_HAS_PHYSICAL_TUPLE(slot) \ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 59b17f3..323b35b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -16,9 +16,13 @@ #include "access/genam.h" #include "access/heapam.h" +#include "access/parallel.h" +#include "access/shmmqam.h" #include "executor/instrument.h" +#include "executor/tqueue.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/shm_mq.h" #include "utils/reltrigger.h" #include "utils/sortsupport.h" #include "utils/tuplestore.h" @@ -389,6 +393,12 @@ typedef struct EState List *es_auxmodifytables; /* List of secondary ModifyTableStates */ /* + * This is required for parallel plan execution to fetch the + * information from dsm. + */ + shm_toc *toc; + + /* * this ExprContext is for per-output-tuple operations, such as constraint * checks and index-value computations. It will be reset for each output * tuple. Note that it will be created only if needed. @@ -1213,6 +1223,29 @@ typedef struct ScanState typedef ScanState SeqScanState; /* + * PartialSeqScan uses a bare SeqScanState as its state node, since + * it needs no additional fields. + */ +typedef SeqScanState PartialSeqScanState; + +/* + * FunnelState extends ScanState by storing additional information + * related to parallel workers. + * dsm_segment dynamic shared memory segment to setup worker queues + * responseq shared memory queues to receive data from workers + */ +typedef struct FunnelState +{ + ScanState ss; /* its first field is NodeTag */ + ParallelContext *pcxt; + shm_mq_handle **responseq; + worker_result pss_workerResult; + TupleQueueFunnel *funnel; + char *inst_options_space; + bool fs_workersReady; +} FunnelState; + +/* * These structs store information about index quals that don't have simple * constant right-hand sides. See comments for ExecIndexBuildScanKeys() * for discussion. diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 97ef0fc..6acbe67 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -51,6 +51,8 @@ typedef enum NodeTag T_BitmapOr, T_Scan, T_SeqScan, + T_PartialSeqScan, + T_Funnel, T_IndexScan, T_IndexOnlyScan, T_BitmapIndexScan, @@ -97,6 +99,8 @@ typedef enum NodeTag T_BitmapOrState, T_ScanState, T_SeqScanState, + T_PartialSeqScanState, + T_FunnelState, T_IndexScanState, T_IndexOnlyScanState, T_BitmapIndexScanState, @@ -217,6 +221,7 @@ typedef enum NodeTag T_IndexOptInfo, T_ParamPathInfo, T_Path, + T_FunnelPath, T_IndexPath, T_BitmapHeapPath, T_BitmapAndPath, diff --git a/src/include/nodes/params.h b/src/include/nodes/params.h index a0f7dd0..65b60a0 100644 --- a/src/include/nodes/params.h +++ b/src/include/nodes/params.h @@ -103,4 +103,9 @@ typedef struct ParamExecData /* Functions found in src/backend/nodes/params.c */ extern ParamListInfo copyParamList(ParamListInfo from); +extern Size +EstimateBoundParametersSpace(ParamListInfo params); +extern void +SerializeBoundParams(ParamListInfo params, Size maxsize, char *start_address); +extern ParamListInfo RestoreBoundParams(char *start_address); #endif /* PARAMS_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index ac13302..ea8e240 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -20,9 +20,16 @@ #ifndef PARSENODES_H #define PARSENODES_H +#include "executor/instrument.h" #include "nodes/bitmapset.h" +#include "nodes/params.h" +#include "nodes/plannodes.h" #include "nodes/primnodes.h" #include "nodes/value.h" +#include "nodes/params.h" +#include "storage/block.h" +#include "storage/shm_toc.h" +#include "storage/shm_mq.h" #include "utils/lockwaitpolicy.h" /* Possible sources of a Query */ @@ -156,6 +163,16 @@ typedef struct Query * depends on to be semantically valid */ } Query; +/* worker statement required for parallel execution. */ +typedef struct ParallelStmt +{ + PlannedStmt *plannedstmt; + ParamListInfo params; + shm_toc *toc; + shm_mq_handle *responseq; + int inst_options; + char *instrument; +} ParallelStmt; /**************************************************************************** * Supporting data structures for Parse Trees diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index f6683f0..8099f78 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -18,6 +18,8 @@ #include "lib/stringinfo.h" #include "nodes/bitmapset.h" #include "nodes/primnodes.h" +#include "storage/block.h" +#include "storage/shm_toc.h" #include "utils/lockwaitpolicy.h" @@ -279,6 +281,22 @@ typedef struct Scan typedef Scan SeqScan; /* ---------------- + * partial sequential scan node + * ---------------- + */ +typedef SeqScan PartialSeqScan; + +/* ---------------- + * parallel sequential scan node + * ---------------- + */ +typedef struct Funnel +{ + Scan scan; + int num_workers; +} Funnel; + +/* ---------------- * index scan node * * indexqualorig is an implicitly-ANDed list of index qual expressions, each diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 6845a40..df1ab5e 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -103,6 +103,8 @@ typedef struct PlannerGlobal bool hasRowSecurity; /* row security applied? */ + bool parallelModeNeeded; /* parallel plans need parallelmode */ + } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ @@ -737,6 +739,13 @@ typedef struct Path /* pathkeys is a List of PathKey nodes; see above */ } Path; +typedef struct FunnelPath +{ + Path path; + Path *subpath; /* path for each worker */ + int num_workers; +} FunnelPath; + /* Macro for extracting a path's parameterization relids; beware double eval */ #define PATH_REQ_OUTER(path) \ ((path)->param_info ? (path)->param_info->ppi_req_outer : (Relids) NULL) diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 9c2000b..11f0409 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -26,6 +26,14 @@ #define DEFAULT_CPU_TUPLE_COST 0.01 #define DEFAULT_CPU_INDEX_TUPLE_COST 0.005 #define DEFAULT_CPU_OPERATOR_COST 0.0025 +#define DEFAULT_CPU_TUPLE_COMM_COST 0.1 +/* + * XXX - We need some experiments to know what could be + * appropriate default values for parallel setup and startup + * cost. + */ +#define DEFAULT_PARALLEL_SETUP_COST 0.0 +#define DEFAULT_PARALLEL_STARTUP_COST 0.0 #define DEFAULT_EFFECTIVE_CACHE_SIZE 524288 /* measured in pages */ @@ -48,8 +56,12 @@ extern PGDLLIMPORT double random_page_cost; extern PGDLLIMPORT double cpu_tuple_cost; extern PGDLLIMPORT double cpu_index_tuple_cost; extern PGDLLIMPORT double cpu_operator_cost; +extern PGDLLIMPORT double cpu_tuple_comm_cost; +extern PGDLLIMPORT double parallel_setup_cost; +extern PGDLLIMPORT double parallel_startup_cost; extern PGDLLIMPORT int effective_cache_size; extern Cost disable_cost; +extern int parallel_seqscan_degree; extern bool enable_seqscan; extern bool enable_indexscan; extern bool enable_indexonlyscan; @@ -68,6 +80,8 @@ extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); +extern void cost_funnel(FunnelPath *path, PlannerInfo *root, + RelOptInfo *baserel, ParamPathInfo *param_info, int nWorkers); extern void cost_index(IndexPath *path, PlannerInfo *root, double loop_count); extern void cost_bitmap_heap_scan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 9923f0e..7873565 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -32,6 +32,11 @@ extern bool add_path_precheck(RelOptInfo *parent_rel, extern Path *create_seqscan_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); +extern Path * +create_partialseqscan_path(PlannerInfo *root, RelOptInfo *rel, + Relids required_outer); +extern FunnelPath *create_funnel_path(PlannerInfo *root, + RelOptInfo *rel, Path *subpath, int nWorkers); extern IndexPath *create_index_path(PlannerInfo *root, IndexOptInfo *index, List *indexclauses, diff --git a/src/include/optimizer/paths.h b/src/include/optimizer/paths.h index 6cad92e..391d519 100644 --- a/src/include/optimizer/paths.h +++ b/src/include/optimizer/paths.h @@ -46,6 +46,13 @@ extern void debug_print_rel(PlannerInfo *root, RelOptInfo *rel); #endif /* + * parallelpath.c + * routines to generate parallel scan paths + */ + +extern void create_parallelscan_paths(PlannerInfo *root, RelOptInfo *rel); + +/* * indxpath.c * routines to generate index paths */ diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index cd62aec..3b7ed92 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -14,6 +14,7 @@ #ifndef PLANNER_H #define PLANNER_H +#include "nodes/parsenodes.h" #include "nodes/plannodes.h" #include "nodes/relation.h" @@ -29,6 +30,8 @@ extern PlannedStmt *planner(Query *parse, int cursorOptions, ParamListInfo boundParams); extern PlannedStmt *standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); +extern PlannedStmt * +create_worker_scan_plannedstmt(PartialSeqScan *partialscan, List *rangetable); extern Plan *subquery_planner(PlannerGlobal *glob, Query *parse, PlannerInfo *parent_root, diff --git a/src/include/postmaster/backendworker.h b/src/include/postmaster/backendworker.h new file mode 100644 index 0000000..1d05d79 --- /dev/null +++ b/src/include/postmaster/backendworker.h @@ -0,0 +1,39 @@ +/*-------------------------------------------------------------------- + * backendworker.h + * POSTGRES backend workers interface + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/postmaster/backendworker.h + *-------------------------------------------------------------------- + */ +#ifndef BACKENDWORKER_H +#define BACKENDWORKER_H + +/*--------------------------------------------------------------------- + * External module API. + *--------------------------------------------------------------------- + */ + +#include "libpq/pqmq.h" + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define PARALLEL_KEY_PLANNEDSTMT 0 +#define PARALLEL_KEY_PARAMS 1 +#define PARALLEL_KEY_INST_OPTIONS 2 +#define PARALLEL_KEY_INST_INFO 3 +#define PARALLEL_KEY_TUPLE_QUEUE 4 +#define PARALLEL_KEY_SCAN 5 + +extern int parallel_seqscan_degree; + +extern void InitializeParallelWorkers(Plan *plan, EState *estate, + Relation rel, char **inst_options_space, + shm_mq_handle ***responseqp, + ParallelContext **pcxtp, + ParallelHeapScanDesc *pscan, + int nWorkers); + +#endif /* BACKENDWORKER_H */ diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h index 5bcca3f..b560672 100644 --- a/src/include/tcop/dest.h +++ b/src/include/tcop/dest.h @@ -94,7 +94,8 @@ typedef enum DestIntoRel, /* results sent to relation (SELECT INTO) */ DestCopyOut, /* results sent to COPY TO code */ DestSQLFunction, /* results sent to SQL-language func mgr */ - DestTransientRel /* results sent to transient relation */ + DestTransientRel, /* results sent to transient relation */ + DestTupleQueue /* results sent to tuple queue */ } CommandDest; /* ---------------- diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 3e17770..489af46 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -84,5 +84,6 @@ extern void set_debug_options(int debug_flag, extern bool set_plan_disabling_options(const char *arg, GucContext context, GucSource source); extern const char *get_stats_option_name(const char *arg); +extern void exec_parallel_stmt(ParallelStmt *parallelscan); #endif /* TCOPPROT_H */ diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index cf319af..38855e5 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -85,6 +85,7 @@ enum config_group STATS_MONITORING, STATS_COLLECTOR, AUTOVACUUM, + PARALLEL_QUERY, CLIENT_CONN, CLIENT_CONN_STATEMENT, CLIENT_CONN_LOCALE,