diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 1247433..cb0299a 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -881,6 +881,9 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_Gather: pname = sname = "Gather"; break; + case T_GatherMerge: + pname = sname = "Gather Merge"; + break; case T_IndexScan: pname = sname = "Index Scan"; break; @@ -1370,6 +1373,26 @@ ExplainNode(PlanState *planstate, List *ancestors, ExplainPropertyBool("Single Copy", gather->single_copy, es); } break; + case T_GatherMerge: + { + GatherMerge *gm = (GatherMerge *) plan; + + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + ExplainPropertyInteger("Workers Planned", + gm->num_workers, es); + if (es->analyze) + { + int nworkers; + + nworkers = ((GatherMergeState *) planstate)->nworkers_launched; + ExplainPropertyInteger("Workers Launched", + nworkers, es); + } + } + break; case T_FunctionScan: if (es->verbose) { diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index 51edd4c..7e2f4e2 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -19,7 +19,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \ nodeBitmapAnd.o nodeBitmapOr.o \ nodeBitmapHeapscan.o nodeBitmapIndexscan.o nodeCustom.o nodeGather.o \ nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \ - nodeLimit.o nodeLockRows.o \ + nodeLimit.o nodeLockRows.o nodeGatherMerge.o \ nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \ nodeNestloop.o nodeFunctionscan.o nodeRecursiveunion.o nodeResult.o \ nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \ diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 554244f..45b36af 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -101,6 +101,7 @@ #include "executor/nodeModifyTable.h" #include "executor/nodeNestloop.h" #include "executor/nodeGather.h" +#include "executor/nodeGatherMerge.h" #include "executor/nodeRecursiveunion.h" #include "executor/nodeResult.h" #include "executor/nodeSamplescan.h" @@ -314,6 +315,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_GatherMerge: + result = (PlanState *) ExecInitGatherMerge((GatherMerge *) node, + estate, eflags); + break; + case T_Hash: result = (PlanState *) ExecInitHash((Hash *) node, estate, eflags); @@ -515,6 +521,10 @@ ExecProcNode(PlanState *node) result = ExecGather((GatherState *) node); break; + case T_GatherMergeState: + result = ExecGatherMerge((GatherMergeState *) node); + break; + case T_HashState: result = ExecHash((HashState *) node); break; @@ -673,6 +683,10 @@ ExecEndNode(PlanState *node) ExecEndGather((GatherState *) node); break; + case T_GatherMergeState: + ExecEndGatherMerge((GatherMergeState *) node); + break; + case T_IndexScanState: ExecEndIndexScan((IndexScanState *) node); break; @@ -806,6 +820,9 @@ ExecShutdownNode(PlanState *node) case T_GatherState: ExecShutdownGather((GatherState *) node); break; + case T_GatherMergeState: + ExecShutdownGatherMerge((GatherMergeState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c new file mode 100644 index 0000000..fd884a8 --- /dev/null +++ b/src/backend/executor/nodeGatherMerge.c @@ -0,0 +1,693 @@ +/*------------------------------------------------------------------------- + * + * nodeGatherMerge.c + * routines to handle GatherMerge nodes. + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/nodeGatherMerge.c + * + *------------------------------------------------------------------------- + */ +/* INTERFACE ROUTINES + * ExecInitGatherMerge - initialize the MergeAppend node + * ExecGatherMerge - retrieve the next tuple from the node + * ExecEndGatherMerge - shut down the MergeAppend node + * ExecReScanGatherMerge - rescan the MergeAppend node + */ + +#include "postgres.h" + +#include "access/relscan.h" +#include "access/xact.h" +#include "executor/execdebug.h" +#include "executor/execParallel.h" +#include "executor/nodeGatherMerge.h" +#include "executor/nodeSubplan.h" +#include "executor/tqueue.h" +#include "miscadmin.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "lib/binaryheap.h" + +/* + * Tuple array for each worker + */ +typedef struct GMReaderTuple +{ + HeapTuple *tuple; + int readCounter; + int nTuples; + bool done; +} GMReaderTuple; + +/* Tuple array size */ +#define MAX_TUPLE_STORE 10 + +static int32 heap_compare_slots(Datum a, Datum b, void *arg); +static TupleTableSlot *gather_merge_getnext(GatherMergeState * gm_state); +static HeapTuple gm_readnext_tuple(GatherMergeState * gm_state, int nreader, bool force, bool *done); +static void gather_merge_init(GatherMergeState * gm_state); +static void ExecShutdownGatherMergeWorkers(GatherMergeState * node); +static bool gather_merge_readnext(GatherMergeState * gm_state, int reader, bool force); +static void fill_tuple_array(GatherMergeState * gm_state, int reader); + +/* ---------------------------------------------------------------- + * ExecInitGather + * ---------------------------------------------------------------- + */ +GatherMergeState * +ExecInitGatherMerge(GatherMerge * node, EState *estate, int eflags) +{ + GatherMergeState *gm_state; + Plan *outerNode; + bool hasoid; + TupleDesc tupDesc; + + /* Gather merge node doesn't have innerPlan node. */ + Assert(innerPlan(node) == NULL); + + /* + * create state structure + */ + gm_state = makeNode(GatherMergeState); + gm_state->ps.plan = (Plan *) node; + gm_state->ps.state = estate; + + /* + * Miscellaneous initialization + * + * create expression context for node + */ + ExecAssignExprContext(estate, &gm_state->ps); + + /* + * initialize child expressions + */ + gm_state->ps.targetlist = (List *) + ExecInitExpr((Expr *) node->plan.targetlist, + (PlanState *) gm_state); + gm_state->ps.qual = (List *) + ExecInitExpr((Expr *) node->plan.qual, + (PlanState *) gm_state); + + /* + * tuple table initialization + */ + gm_state->funnel_slot = ExecInitExtraTupleSlot(estate); + ExecInitResultTupleSlot(estate, &gm_state->ps); + + /* + * now initialize outer plan + */ + outerNode = outerPlan(node); + outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags); + + gm_state->ps.ps_TupFromTlist = false; + + /* + * Initialize result tuple type and projection info. + */ + ExecAssignResultTypeFromTL(&gm_state->ps); + ExecAssignProjectionInfo(&gm_state->ps, NULL); + + gm_state->gm_initialized = false; + + /* + * initialize sort-key information + */ + if (node->numCols) + { + int i; + + gm_state->gm_nkeys = node->numCols; + gm_state->gm_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols); + for (i = 0; i < node->numCols; i++) + { + SortSupport sortKey = gm_state->gm_sortkeys + i; + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = node->collations[i]; + sortKey->ssup_nulls_first = node->nullsFirst[i]; + sortKey->ssup_attno = node->sortColIdx[i]; + + /* + * We don't perform abbreviated key conversion here, for the same + * reasons that it isn't used in MergeAppend + */ + sortKey->abbreviate = false; + + PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); + } + } + + /* + * Initialize funnel slot to same tuple descriptor as outer plan. + */ + if (!ExecContextForcesOids(&gm_state->ps, &hasoid)) + hasoid = false; + tupDesc = ExecTypeFromTL(outerNode->targetlist, hasoid); + ExecSetSlotDescriptor(gm_state->funnel_slot, tupDesc); + + return gm_state; +} + +/* ---------------------------------------------------------------- + * ExecGatherMerge(node) + * + * Scans the relation via multiple workers and returns + * the next qualifying tuple. + * ---------------------------------------------------------------- + */ +TupleTableSlot * +ExecGatherMerge(GatherMergeState * node) +{ + TupleTableSlot *fslot = node->funnel_slot; + int i; + TupleTableSlot *slot; + TupleTableSlot *resultSlot; + ExprDoneCond isDone; + ExprContext *econtext; + + /* + * Initialize the parallel context and workers on first execution. We do + * this on first execution rather than during node initialization, as it + * needs to allocate large dynamic segment, so it is better to do if it is + * really needed. + */ + if (!node->initialized) + { + EState *estate = node->ps.state; + GatherMerge *gm = (GatherMerge *) node->ps.plan; + + /* + * Sometimes we might have to run without parallelism; but if parallel + * mode is active then we can try to fire up some workers. + */ + if (gm->num_workers > 0 && IsInParallelMode()) + { + ParallelContext *pcxt; + bool got_any_worker = false; + + /* Initialize the workers required to execute Gather node. */ + if (!node->pei) + node->pei = ExecInitParallelPlan(node->ps.lefttree, + estate, + gm->num_workers); + + /* + * Register backend workers. We might not get as many as we + * requested, or indeed any at all. + */ + pcxt = node->pei->pcxt; + LaunchParallelWorkers(pcxt); + node->nworkers_launched = pcxt->nworkers_launched; + + /* Set up tuple queue readers to read the results. */ + if (pcxt->nworkers_launched > 0) + { + node->nreaders = 0; + node->reader = + palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *)); + + Assert(gm->numCols); + + for (i = 0; i < pcxt->nworkers_launched; ++i) + { + if (pcxt->worker[i].bgwhandle == NULL) + continue; + + shm_mq_set_handle(node->pei->tqueue[i], + pcxt->worker[i].bgwhandle); + node->reader[node->nreaders] = + CreateTupleQueueReader(node->pei->tqueue[i], + fslot->tts_tupleDescriptor); + node->nreaders++; + got_any_worker = true; + } + } + + /* No workers? Then never mind. */ + if (!got_any_worker || + node->nreaders < 2) + { + ExecShutdownGatherMergeWorkers(node); + node->nreaders = 0; + } + } + + /* always allow leader to participate into gather merge */ + node->need_to_scan_locally = true; + node->initialized = true; + } + + /* + * Check to see if we're still projecting out tuples from a previous scan + * tuple (because there is a function-returning-set in the projection + * expressions). If so, try to project another one. + */ + if (node->ps.ps_TupFromTlist) + { + resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone); + if (isDone == ExprMultipleResult) + return resultSlot; + /* Done with that source tuple... */ + node->ps.ps_TupFromTlist = false; + } + + /* + * Reset per-tuple memory context to free any expression evaluation + * storage allocated in the previous tuple cycle. Note we can't do this + * until we're done projecting. + */ + econtext = node->ps.ps_ExprContext; + ResetExprContext(econtext); + + /* Get and return the next tuple, projecting if necessary. */ + for (;;) + { + /* + * Get next tuple, either from one of our workers, or by running the + * plan ourselves. + */ + slot = gather_merge_getnext(node); + if (TupIsNull(slot)) + return NULL; + + /* + * form the result tuple using ExecProject(), and return it --- unless + * the projection produces an empty set, in which case we must loop + * back around for another tuple + */ + econtext->ecxt_outertuple = slot; + resultSlot = ExecProject(node->ps.ps_ProjInfo, &isDone); + + if (isDone != ExprEndResult) + { + node->ps.ps_TupFromTlist = (isDone == ExprMultipleResult); + return resultSlot; + } + } + + return slot; +} + +/* ---------------------------------------------------------------- + * ExecEndGatherMerge + * + * frees any storage allocated through C routines. + * ---------------------------------------------------------------- + */ +void +ExecEndGatherMerge(GatherMergeState * node) +{ + ExecShutdownGatherMerge(node); + ExecFreeExprContext(&node->ps); + ExecClearTuple(node->ps.ps_ResultTupleSlot); + ExecEndNode(outerPlanState(node)); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGatherMerge + * + * Destroy the setup for parallel workers including parallel context. + * Collect all the stats after workers are stopped, else some work + * done by workers won't be accounted. + * ---------------------------------------------------------------- + */ +void +ExecShutdownGatherMerge(GatherMergeState * node) +{ + ExecShutdownGatherMergeWorkers(node); + + /* Now destroy the parallel context. */ + if (node->pei != NULL) + { + ExecParallelCleanup(node->pei); + node->pei = NULL; + } +} + +/* ---------------------------------------------------------------- + * ExecReScanGatherMerge + * + * Re-initialize the workers and rescans a relation via them. + * ---------------------------------------------------------------- + */ +void +ExecReScanGatherMerge(GatherMergeState * node) +{ + /* + * Re-initialize the parallel workers to perform rescan of relation. We + * want to gracefully shutdown all the workers so that they should be able + * to propagate any error or other information to master backend before + * dying. Parallel context will be reused for rescan. + */ + ExecShutdownGatherMergeWorkers(node); + + node->initialized = false; + + if (node->pei) + ExecParallelReinitialize(node->pei); + + ExecReScan(node->ps.lefttree); +} + +/* ---------------------------------------------------------------- + * ExecShutdownGatherMergeWorkers + * + * Destroy the parallel workers. Collect all the stats after + * workers are stopped, else some work done by workers won't be + * accounted. + * ---------------------------------------------------------------- + */ +static void +ExecShutdownGatherMergeWorkers(GatherMergeState * node) +{ + /* Shut down tuple queue readers before shutting down workers. */ + if (node->reader != NULL) + { + int i; + + for (i = 0; i < node->nreaders; ++i) + if (node->reader[i]) + DestroyTupleQueueReader(node->reader[i]); + + pfree(node->reader); + node->reader = NULL; + } + + /* Now shut down the workers. */ + if (node->pei != NULL) + ExecParallelFinish(node->pei); +} + +/* + * Initialize the Gather merge tuple read. + * + * Pull atleast single tuple from each worker + leader and set up the heap. + */ +static void +gather_merge_init(GatherMergeState * gm_state) +{ + TupleTableSlot *fslot = gm_state->funnel_slot; + int nreaders = gm_state->nreaders; + bool initialize = true; + int i; + + /* + * Allocate gm_slots for the number of worker + one more slot for leader. + * Last slot is always for leader. Leader always calls ExecProcNode() to + * read the tuple which will return the TupleTableSlot. Later it will + * directly get assigned to gm_slot. So just initialize leader gm_slot + * with NULL. For other slots below code will call + * ExecInitExtraTupleSlot() which will do the initialization of worker + * slots. + */ + gm_state->gm_slots = + palloc((gm_state->nreaders + 1) * sizeof(TupleTableSlot *)); + gm_state->gm_slots[gm_state->nreaders] = NULL; + + /* Initialize the tuple slot and tuple array for each worker */ + gm_state->gm_tuple = (GMReaderTuple *) palloc0(sizeof(GMReaderTuple) * (gm_state->nreaders)); + for (i = 0; i < gm_state->nreaders; i++) + { + /* Allocate the tuple array with MAX_TUPLE_STORE size */ + gm_state->gm_tuple[i].tuple = (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); + + /* Initialize slot for worker */ + gm_state->gm_slots[i] = ExecInitExtraTupleSlot(gm_state->ps.state); + ExecSetSlotDescriptor(gm_state->gm_slots[i], + fslot->tts_tupleDescriptor); + } + + /* Allocate the resources for the sort */ + gm_state->gm_heap = binaryheap_allocate(gm_state->nreaders + 1, heap_compare_slots, gm_state); + + /* + * First try to read tuple for each worker (including leader) into nowait + * mode, so that we initialize read from each worker as well as leader. + * After this if all worker unable to produce the tuple, then re-read and + * this time read the tuple into wait mode. For the worker, which was able + * to produced single tuple in the earlier loop, just fill the tuple array + * if more tuples available. + */ +reread: + for (i = 0; i < nreaders + 1; i++) + { + if (TupIsNull(gm_state->gm_slots[i]) || + gm_state->gm_slots[i]->tts_isempty) + { + if (gather_merge_readnext(gm_state, i, initialize ? false : true)) + { + binaryheap_add_unordered(gm_state->gm_heap, + Int32GetDatum(i)); + } + } + else + fill_tuple_array(gm_state, i); + } + initialize = false; + + for (i = 0; i < nreaders; i++) + if (TupIsNull(gm_state->gm_slots[i]) || gm_state->gm_slots[i]->tts_isempty) + goto reread; + + binaryheap_build(gm_state->gm_heap); + gm_state->gm_initialized = true; +} + +/* + * Read the next tuple for gather merge. + * + * Function fetch the sorted tuple out of the heap. + */ +static TupleTableSlot * +gather_merge_getnext(GatherMergeState * gm_state) +{ + TupleTableSlot *fslot = gm_state->funnel_slot; + int i; + + /* + * First time through: pull the first tuple from each participate, and set + * up the heap. + */ + if (gm_state->gm_initialized == false) + gather_merge_init(gm_state); + else + { + /* + * Otherwise, pull the next tuple from whichever participate we + * returned from last time, and reinsert the index into the heap, + * because it might now compare differently against the existing + * elements of the heap. + */ + i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + + if (gather_merge_readnext(gm_state, i, true)) + binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); + else + (void) binaryheap_remove_first(gm_state->gm_heap); + } + + if (binaryheap_empty(gm_state->gm_heap)) + { + /* All the queues are exhausted, and so is the heap */ + return ExecClearTuple(fslot); + } + else + { + i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); + return gm_state->gm_slots[i]; + } + + return ExecClearTuple(fslot); +} + +/* + * Read the tuple for given reader into nowait mode, and fill the tuple array. + */ +static void +fill_tuple_array(GatherMergeState * gm_state, int reader) +{ + GMReaderTuple *gm_tuple = &gm_state->gm_tuple[reader]; + int i; + + /* Last slot is for leader and we don't build tuple array for leader */ + if (reader == gm_state->nreaders) + return; + + /* + * We here because we already read all the tuples from the tuple array, so + * initialize the counter to zero. + */ + if (gm_tuple->nTuples == gm_tuple->readCounter) + gm_tuple->nTuples = gm_tuple->readCounter = 0; + + /* Tuple array is already full? */ + if (gm_tuple->nTuples == MAX_TUPLE_STORE) + return; + + for (i = gm_tuple->nTuples; i < MAX_TUPLE_STORE; i++) + { + gm_tuple->tuple[i] = gm_readnext_tuple(gm_state, + reader, + false, + &gm_tuple->done); + if (!HeapTupleIsValid(gm_tuple->tuple[i])) + break; + gm_tuple->nTuples++; + } +} + +/* + * Function attempt to read tuple for the given reader and store it into reader + * tuple slot. + * + * If the worker tuple array contains any tuple then just read tuple from the + * tuple array. Other wise read the tuple from the queue and also attempt to + * fill the tuple array. + * + * When force is true, function reads the tuple into wait mode. For gather + * merge we need to fill the slot from which we returned the earlier tuple, so + * this require tuple to be read into wait mode. During initialization phase, + * once we try to read the tuple into no-wait mode as we want to initialize all + * the readers. Refer gather_merge_init() for more details. + * + * Function returns true if found tuple for the reader, otherwise returns + * false. + */ +static bool +gather_merge_readnext(GatherMergeState * gm_state, int reader, bool force) +{ + HeapTuple tup = NULL; + + /* We here for leader? */ + if (gm_state->nreaders == reader) + { + if (gm_state->need_to_scan_locally) + { + PlanState *outerPlan = outerPlanState(gm_state); + TupleTableSlot *outerTupleSlot; + + outerTupleSlot = ExecProcNode(outerPlan); + + if (!TupIsNull(outerTupleSlot)) + { + gm_state->gm_slots[reader] = outerTupleSlot; + return true; + } + gm_state->need_to_scan_locally = false; + } + return false; + } + /* Does tuple array have any avaiable tuples? */ + else if (gm_state->gm_tuple[reader].nTuples > + gm_state->gm_tuple[reader].readCounter) + { + GMReaderTuple *gm_tuple = &gm_state->gm_tuple[reader]; + + tup = gm_tuple->tuple[gm_tuple->readCounter++]; + } + /* reader exhausted? */ + else if (gm_state->gm_tuple[reader].done) + { + DestroyTupleQueueReader(gm_state->reader[reader]); + gm_state->reader[reader] = NULL; + return false; + } + else + { + tup = gm_readnext_tuple(gm_state, reader, force, NULL); + + /* + * try to read more tuple into nowait mode and store it into the tuple + * array. + */ + if (HeapTupleIsValid(tup)) + fill_tuple_array(gm_state, reader); + else + return false; + } + + Assert(HeapTupleIsValid(tup)); + + /* Build the TupleTableSlot for the given tuple */ + ExecStoreTuple(tup, /* tuple to store */ + gm_state->gm_slots[reader], /* slot in which to store the + * tuple */ + InvalidBuffer, /* buffer associated with this tuple */ + true); /* pfree this pointer if not from heap */ + + return true; +} + +/* + * Attempt to read a tuple from given reader. + */ +static HeapTuple +gm_readnext_tuple(GatherMergeState * gm_state, int nreader, bool force, bool *done) +{ + TupleQueueReader *reader; + HeapTuple tup = NULL; + + if (done != NULL) + *done = false; + + /* Check for async events, particularly messages from workers. */ + CHECK_FOR_INTERRUPTS(); + + /* Attempt to read a tuple. */ + reader = gm_state->reader[nreader]; + tup = TupleQueueReaderNext(reader, force ? false : true, done); + + return tup; +} + +/* + * We have one slot for each item in the heap array. We use SlotNumber + * to store slot indexes. This doesn't actually provide any formal + * type-safety, but it makes the code more self-documenting. + */ +typedef int32 SlotNumber; + +/* + * Compare the tuples in the two given slots. + */ +static int32 +heap_compare_slots(Datum a, Datum b, void *arg) +{ + GatherMergeState *node = (GatherMergeState *) arg; + SlotNumber slot1 = DatumGetInt32(a); + SlotNumber slot2 = DatumGetInt32(b); + + TupleTableSlot *s1 = node->gm_slots[slot1]; + TupleTableSlot *s2 = node->gm_slots[slot2]; + int nkey; + + Assert(!TupIsNull(s1)); + Assert(!TupIsNull(s2)); + + for (nkey = 0; nkey < node->gm_nkeys; nkey++) + { + SortSupport sortKey = node->gm_sortkeys + nkey; + AttrNumber attno = sortKey->ssup_attno; + Datum datum1, + datum2; + bool isNull1, + isNull2; + int compare; + + datum1 = slot_getattr(s1, attno, &isNull1); + datum2 = slot_getattr(s2, attno, &isNull2); + + compare = ApplySortComparator(datum1, isNull1, + datum2, isNull2, + sortKey); + if (compare != 0) + return -compare; + } + return 0; +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 71714bc..8b92c1a 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -341,6 +341,31 @@ _copyGather(const Gather *from) return newnode; } +/* + * _copyGatherMerge + */ +static GatherMerge * +_copyGatherMerge(const GatherMerge *from) +{ + GatherMerge *newnode = makeNode(GatherMerge); + + /* + * copy node superclass fields + */ + CopyPlanFields((const Plan *) from, (Plan *) newnode); + + /* + * copy remainder of node + */ + COPY_SCALAR_FIELD(num_workers); + COPY_SCALAR_FIELD(numCols); + COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber)); + COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); + COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid)); + COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool)); + + return newnode; +} /* * CopyScanFields @@ -4343,6 +4368,9 @@ copyObject(const void *from) case T_Gather: retval = _copyGather(from); break; + case T_GatherMerge: + retval = _copyGatherMerge(from); + break; case T_SeqScan: retval = _copySeqScan(from); break; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ae86954..5dea0f7 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -447,6 +447,35 @@ _outGather(StringInfo str, const Gather *node) } static void +_outGatherMerge(StringInfo str, const GatherMerge *node) +{ + int i; + + WRITE_NODE_TYPE("GATHERMERGE"); + + _outPlanInfo(str, (const Plan *) node); + + WRITE_INT_FIELD(num_workers); + WRITE_INT_FIELD(numCols); + + appendStringInfoString(str, " :sortColIdx"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %d", node->sortColIdx[i]); + + appendStringInfoString(str, " :sortOperators"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %u", node->sortOperators[i]); + + appendStringInfoString(str, " :collations"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %u", node->collations[i]); + + appendStringInfoString(str, " :nullsFirst"); + for (i = 0; i < node->numCols; i++) + appendStringInfo(str, " %s", booltostr(node->nullsFirst[i])); +} + +static void _outScan(StringInfo str, const Scan *node) { WRITE_NODE_TYPE("SCAN"); @@ -1964,6 +1993,18 @@ _outLimitPath(StringInfo str, const LimitPath *node) } static void +_outGatherMergePath(StringInfo str, const GatherMergePath *node) +{ + WRITE_NODE_TYPE("GATHERMERGEPATH"); + + _outPathInfo(str, (const Path *) node); + + WRITE_NODE_FIELD(subpath); + WRITE_INT_FIELD(num_workers); + WRITE_BOOL_FIELD(single_copy); +} + +static void _outNestPath(StringInfo str, const NestPath *node) { WRITE_NODE_TYPE("NESTPATH"); @@ -3322,6 +3363,9 @@ outNode(StringInfo str, const void *obj) case T_Gather: _outGather(str, obj); break; + case T_GatherMerge: + _outGatherMerge(str, obj); + break; case T_Scan: _outScan(str, obj); break; @@ -3649,6 +3693,9 @@ outNode(StringInfo str, const void *obj) case T_LimitPath: _outLimitPath(str, obj); break; + case T_GatherMergePath: + _outGatherMergePath(str, obj); + break; case T_NestPath: _outNestPath(str, obj); break; diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 917e6c8..77a452e 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -2075,6 +2075,26 @@ _readGather(void) } /* + * _readGatherMerge + */ +static GatherMerge * +_readGatherMerge(void) +{ + READ_LOCALS(GatherMerge); + + ReadCommonPlan(&local_node->plan); + + READ_INT_FIELD(num_workers); + READ_INT_FIELD(numCols); + READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols); + READ_OID_ARRAY(sortOperators, local_node->numCols); + READ_OID_ARRAY(collations, local_node->numCols); + READ_BOOL_ARRAY(nullsFirst, local_node->numCols); + + READ_DONE(); +} + +/* * _readHash */ static Hash * @@ -2477,6 +2497,8 @@ parseNodeString(void) return_value = _readUnique(); else if (MATCH("GATHER", 6)) return_value = _readGather(); + else if (MATCH("GATHERMERGE", 11)) + return_value = _readGatherMerge(); else if (MATCH("HASH", 4)) return_value = _readHash(); else if (MATCH("SETOP", 5)) diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 2a49639..5dbb83e 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -126,6 +126,7 @@ bool enable_nestloop = true; bool enable_material = true; bool enable_mergejoin = true; bool enable_hashjoin = true; +bool enable_gathermerge = true; typedef struct { @@ -391,6 +392,70 @@ cost_gather(GatherPath *path, PlannerInfo *root, } /* + * cost_gather_merge + * Determines and returns the cost of gather merge path. + * + * GatherMerge merges several pre-sorted input streams, using a heap that at + * any given instant holds the next tuple from each stream. If there are N + * streams, we need about N*log2(N) tuple comparisons to construct the heap at + * startup, and then for each output tuple, about log2(N) comparisons to delete + * the top heap entry and another log2(N) comparisons to insert its successor + * from the same stream. + * + * The heap is never spilled to disk, since we assume N is not very large. So + * this is much simple then cost_sort. + */ +void +cost_gather_merge(GatherMergePath *path, PlannerInfo *root, + RelOptInfo *rel, ParamPathInfo *param_info, + Cost input_startup_cost, Cost input_total_cost) +{ + Cost startup_cost = 0; + Cost run_cost = 0; + Cost comparison_cost; + double N; + double logN; + + /* Mark the path with the correct row estimate */ + if (param_info) + path->path.rows = param_info->ppi_rows; + else + path->path.rows = path->subpath->rows; + + if (!enable_gathermerge) + startup_cost += disable_cost; + + /* + * Avoid log(0)... + */ + N = (path->num_workers < 2) ? 2.0 : (double) path->num_workers; + logN = LOG2(N); + + /* Assumed cost per tuple comparison */ + comparison_cost = 2.0 * cpu_operator_cost; + + /* Heap creation cost */ + startup_cost += comparison_cost * N * logN; + + /* Per-tuple heap maintenance cost */ + run_cost += path->path.rows * comparison_cost * 2.0 * logN; + + /* small cost for heap management, like cost_merge_append */ + run_cost += cpu_operator_cost * path->path.rows; + + /* + * Parallel setup and communication cost. For Gather Merge, require tuple + * to be read into wait mode from each worker, so considering some extra + * cost for the same. + */ + startup_cost += parallel_setup_cost; + run_cost += parallel_tuple_cost * path->path.rows; + + path->path.startup_cost = startup_cost + input_startup_cost; + path->path.total_cost = (startup_cost + run_cost + input_total_cost); +} + +/* * cost_index * Determines and returns the cost of scanning a relation using an index. * diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 47158f6..96bed2e 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -270,6 +270,11 @@ static ModifyTable *make_modifytable(PlannerInfo *root, List *resultRelations, List *subplans, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, int epqParam); +static GatherMerge *create_gather_merge_plan(PlannerInfo *root, + GatherMergePath *best_path); +static GatherMerge *make_gather_merge(List *qptlist, List *qpqual, + int nworkers, bool single_copy, + Plan *subplan); /* @@ -463,6 +468,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags) (LimitPath *) best_path, flags); break; + case T_GatherMerge: + plan = (Plan *) create_gather_merge_plan(root, + (GatherMergePath *) best_path); + break; default: elog(ERROR, "unrecognized node type: %d", (int) best_path->pathtype); @@ -2246,6 +2255,90 @@ create_limit_plan(PlannerInfo *root, LimitPath *best_path, int flags) return plan; } +/* + * create_gather_merge_plan + * + * Create a Gather merge plan for 'best_path' and (recursively) + * plans for its subpaths. + */ +static GatherMerge * +create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path) +{ + GatherMerge *gm_plan; + Plan *subplan; + List *pathkeys = best_path->path.pathkeys; + int numsortkeys; + AttrNumber *sortColIdx; + Oid *sortOperators; + Oid *collations; + bool *nullsFirst; + + subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST); + + gm_plan = make_gather_merge(subplan->targetlist, + NIL, + best_path->num_workers, + best_path->single_copy, + subplan); + + copy_generic_path_info(&gm_plan->plan, &best_path->path); + + if (pathkeys) + { + /* Compute sort column info, and adjust GatherMerge tlist as needed */ + (void) prepare_sort_from_pathkeys(&gm_plan->plan, pathkeys, + best_path->path.parent->relids, + NULL, + true, + &gm_plan->numCols, + &gm_plan->sortColIdx, + &gm_plan->sortOperators, + &gm_plan->collations, + &gm_plan->nullsFirst); + + + /* Compute sort column info, and adjust subplan's tlist as needed */ + subplan = prepare_sort_from_pathkeys(subplan, pathkeys, + best_path->subpath->parent->relids, + gm_plan->sortColIdx, + false, + &numsortkeys, + &sortColIdx, + &sortOperators, + &collations, + &nullsFirst); + + /* + * Check that we got the same sort key information. We just Assert + * that the sortops match, since those depend only on the pathkeys; + * but it seems like a good idea to check the sort column numbers + * explicitly, to ensure the tlists really do match up. + */ + Assert(numsortkeys == gm_plan->numCols); + if (memcmp(sortColIdx, gm_plan->sortColIdx, + numsortkeys * sizeof(AttrNumber)) != 0) + elog(ERROR, "GatherMerge child's targetlist doesn't match GatherMerge"); + Assert(memcmp(sortOperators, gm_plan->sortOperators, + numsortkeys * sizeof(Oid)) == 0); + Assert(memcmp(collations, gm_plan->collations, + numsortkeys * sizeof(Oid)) == 0); + Assert(memcmp(nullsFirst, gm_plan->nullsFirst, + numsortkeys * sizeof(bool)) == 0); + + /* Now, insert a Sort node if subplan isn't sufficiently ordered */ + if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys)) + subplan = (Plan *) make_sort(subplan, numsortkeys, + sortColIdx, sortOperators, + collations, nullsFirst); + + gm_plan->plan.lefttree = subplan; + } + + /* use parallel mode for parallel plans. */ + root->glob->parallelModeNeeded = true; + + return gm_plan; +} /***************************************************************************** * @@ -5902,6 +5995,26 @@ make_gather(List *qptlist, return node; } +static GatherMerge * +make_gather_merge(List *qptlist, + List *qpqual, + int nworkers, + bool single_copy, + Plan *subplan) +{ + GatherMerge *node = makeNode(GatherMerge); + Plan *plan = &node->plan; + + /* cost should be inserted by caller */ + plan->targetlist = qptlist; + plan->qual = qpqual; + plan->lefttree = subplan; + plan->righttree = NULL; + node->num_workers = nworkers; + + return node; +} + /* * distinctList is a list of SortGroupClauses, identifying the targetlist * items that should be considered by the SetOp filter. The input path must diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index f657ffc..7339f03 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -3719,14 +3719,59 @@ create_grouping_paths(PlannerInfo *root, /* * Now generate a complete GroupAgg Path atop of the cheapest partial - * path. We need only bother with the cheapest path here, as the - * output of Gather is never sorted. + * path. We generate a Gather path based on the cheapest partial path, + * and a GatherMerge path for each partial path that is properly sorted. */ if (grouped_rel->partial_pathlist) { Path *path = (Path *) linitial(grouped_rel->partial_pathlist); double total_groups = path->rows * path->parallel_workers; + /* + * GatherMerge is always sorted, so if there is GROUP BY clause, + * try to generate GatherMerge path for each partial path. + */ + if (parse->groupClause) + { + foreach(lc, grouped_rel->partial_pathlist) + { + Path *gmpath = (Path *) lfirst(lc); + + if (!pathkeys_contained_in(root->group_pathkeys, gmpath->pathkeys)) + continue; + + /* create gather merge path */ + gmpath = (Path *) create_gather_merge_path(root, + grouped_rel, + gmpath, + NULL, + root->group_pathkeys, + NULL); + + if (parse->hasAggs) + add_path(grouped_rel, (Path *) + create_agg_path(root, + grouped_rel, + gmpath, + target, + parse->groupClause ? AGG_SORTED : AGG_PLAIN, + AGGSPLIT_FINAL_DESERIAL, + parse->groupClause, + (List *) parse->havingQual, + &agg_final_costs, + dNumGroups)); + else + add_path(grouped_rel, (Path *) + create_group_path(root, + grouped_rel, + gmpath, + target, + parse->groupClause, + (List *) parse->havingQual, + dNumGroups)); + } + } + path = (Path *) create_gather_path(root, grouped_rel, path, @@ -3864,6 +3909,12 @@ create_grouping_paths(PlannerInfo *root, /* Now choose the best path(s) */ set_cheapest(grouped_rel); + /* + * Partial pathlist generated for grouped relation are no further useful, + * so just reset it with null. + */ + grouped_rel->partial_pathlist = NIL; + return grouped_rel; } @@ -4160,6 +4211,36 @@ create_distinct_paths(PlannerInfo *root, } } + /* + * Generate GatherMerge path for each partial path. + */ + foreach(lc, input_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + + if (!pathkeys_contained_in(needed_pathkeys, path->pathkeys)) + { + path = (Path *) create_sort_path(root, distinct_rel, + path, + needed_pathkeys, + -1.0); + } + + /* create gather merge path */ + path = (Path *) create_gather_merge_path(root, + distinct_rel, + path, + NULL, + needed_pathkeys, + NULL); + add_path(distinct_rel, (Path *) + create_upper_unique_path(root, + distinct_rel, + path, + list_length(root->distinct_pathkeys), + numDistinctRows)); + } + /* For explicit-sort case, always use the more rigorous clause */ if (list_length(root->distinct_pathkeys) < list_length(root->sort_pathkeys)) @@ -4304,6 +4385,39 @@ create_ordered_paths(PlannerInfo *root, ordered_rel->useridiscurrent = input_rel->useridiscurrent; ordered_rel->fdwroutine = input_rel->fdwroutine; + foreach(lc, input_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + bool is_sorted; + + is_sorted = pathkeys_contained_in(root->sort_pathkeys, + path->pathkeys); + if (!is_sorted) + { + /* An explicit sort here can take advantage of LIMIT */ + path = (Path *) create_sort_path(root, + ordered_rel, + path, + root->sort_pathkeys, + limit_tuples); + } + + /* create gather merge path */ + path = (Path *) create_gather_merge_path(root, + ordered_rel, + path, + target, + root->sort_pathkeys, + NULL); + + /* Add projection step if needed */ + if (path->pathtarget != target) + path = apply_projection_to_path(root, ordered_rel, + path, target); + + add_path(ordered_rel, path); + } + foreach(lc, input_rel->pathlist) { Path *path = (Path *) lfirst(lc); diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index d10a983..d14db7d 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -605,6 +605,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) break; case T_Gather: + case T_GatherMerge: set_upper_references(root, plan, rtoffset); break; diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index 263ba45..760f519 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2682,6 +2682,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, case T_Sort: case T_Unique: case T_Gather: + case T_GatherMerge: case T_SetOp: case T_Group: break; diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index abb7507..f83cd77 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1630,6 +1630,66 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, } /* + * create_gather_merge_path + * + * Creates a path corresponding to a gather merge scan, returning + * the pathnode. + */ +GatherMergePath * +create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, + PathTarget *target, List *pathkeys, + Relids required_outer) +{ + GatherMergePath *pathnode = makeNode(GatherMergePath); + Cost input_startup_cost = 0; + Cost input_total_cost = 0; + + Assert(subpath->parallel_safe); + Assert(pathkeys); + + pathnode->path.pathtype = T_GatherMerge; + pathnode->path.parent = rel; + pathnode->path.param_info = get_baserel_parampathinfo(root, rel, + required_outer); + pathnode->path.parallel_aware = false; + + pathnode->subpath = subpath; + pathnode->num_workers = subpath->parallel_workers; + pathnode->path.pathkeys = pathkeys; + pathnode->path.pathtarget = target ? target : rel->reltarget; + pathnode->path.rows += subpath->rows; + + if (pathkeys_contained_in(pathkeys, subpath->pathkeys)) + { + /* Subpath is adequately ordered, we won't need to sort it */ + input_startup_cost += subpath->startup_cost; + input_total_cost += subpath->total_cost; + } + else + { + /* We'll need to insert a Sort node, so include cost for that */ + Path sort_path; /* dummy for result of cost_sort */ + + cost_sort(&sort_path, + root, + pathkeys, + subpath->total_cost, + subpath->rows, + subpath->pathtarget->width, + 0.0, + work_mem, + 0 /* FIXME: pathnode->limit_tuples*/); + input_startup_cost += sort_path.startup_cost; + input_total_cost += sort_path.total_cost; + } + + cost_gather_merge(pathnode, root, rel, pathnode->path.param_info, + input_startup_cost, input_total_cost); + + return pathnode; +} + +/* * translate_sub_tlist - get subquery column numbers represented by tlist * * The given targetlist usually contains only Vars referencing the given relid. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 622279b..502f17d 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -894,6 +894,15 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_gathermerge", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of gather merge plans."), + NULL + }, + &enable_gathermerge, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, diff --git a/src/include/executor/nodeGatherMerge.h b/src/include/executor/nodeGatherMerge.h new file mode 100644 index 0000000..58dcebf --- /dev/null +++ b/src/include/executor/nodeGatherMerge.h @@ -0,0 +1,27 @@ +/*------------------------------------------------------------------------- + * + * nodeGatherMerge.h + * prototypes for nodeGatherMerge.c + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/executor/nodeGatherMerge.h + * + *------------------------------------------------------------------------- + */ +#ifndef NODEGATHERMERGE_H +#define NODEGATHERMERGE_H + +#include "nodes/execnodes.h" + +extern GatherMergeState *ExecInitGatherMerge(GatherMerge * node, + EState *estate, + int eflags); +extern TupleTableSlot *ExecGatherMerge(GatherMergeState * node); +extern void ExecEndGatherMerge(GatherMergeState * node); +extern void ExecReScanGatherMerge(GatherMergeState * node); +extern void ExecShutdownGatherMerge(GatherMergeState * node); + +#endif /* NODEGATHERMERGE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 4fa3661..54d929f 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1963,6 +1963,33 @@ typedef struct GatherState } GatherState; /* ---------------- + * GatherMergeState information + * + * Gather merge nodes launch 1 or more parallel workers, run a + * subplan in those workers, collect the results and perform sort. + * ---------------- + */ +struct GMReaderTuple; + +typedef struct GatherMergeState +{ + PlanState ps; /* its first field is NodeTag */ + bool initialized; + struct ParallelExecutorInfo *pei; + int nreaders; + int nworkers_launched; + struct TupleQueueReader **reader; + TupleTableSlot *funnel_slot; + TupleTableSlot **gm_slots; + struct binaryheap *gm_heap; /* binary heap of slot indices */ + bool gm_initialized; /* gather merge initilized ? */ + bool need_to_scan_locally; + int gm_nkeys; + SortSupport gm_sortkeys; /* array of length ms_nkeys */ + struct GMReaderTuple *gm_tuple; /* array of lenght nreaders + leader */ +} GatherMergeState; + +/* ---------------- * HashState information * ---------------- */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 88297bb..edfb917 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -75,6 +75,7 @@ typedef enum NodeTag T_WindowAgg, T_Unique, T_Gather, + T_GatherMerge, T_Hash, T_SetOp, T_LockRows, @@ -123,6 +124,7 @@ typedef enum NodeTag T_WindowAggState, T_UniqueState, T_GatherState, + T_GatherMergeState, T_HashState, T_SetOpState, T_LockRowsState, @@ -244,6 +246,7 @@ typedef enum NodeTag T_MaterialPath, T_UniquePath, T_GatherPath, + T_GatherMergePath, T_ProjectionPath, T_SortPath, T_GroupPath, diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index e2fbc7d..ec319bf 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -765,6 +765,22 @@ typedef struct Gather bool invisible; /* suppress EXPLAIN display (for testing)? */ } Gather; +/* ------------ + * gather merge node + * ------------ + */ +typedef struct GatherMerge +{ + Plan plan; + int num_workers; + /* remaining fields are just like the sort-key info in struct Sort */ + int numCols; /* number of sort-key columns */ + AttrNumber *sortColIdx; /* their indexes in the target list */ + Oid *sortOperators; /* OIDs of operators to sort them by */ + Oid *collations; /* OIDs of collations */ + bool *nullsFirst; /* NULLS FIRST/LAST directions */ +} GatherMerge; + /* ---------------- * hash build node * diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 3a1255a..dfaca79 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1194,6 +1194,19 @@ typedef struct GatherPath } GatherPath; /* + * GatherMergePath runs several copies of a plan in parallel and + * collects the results. FIXME: comments + */ +typedef struct GatherMergePath +{ + Path path; + Path *subpath; /* path for each worker */ + int num_workers; /* number of workers sought to help */ + bool single_copy; /* path must not be executed >1x */ +} GatherMergePath; + + +/* * All join-type paths share these fields. */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 2a4df2f..cd48cc4 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -66,6 +66,7 @@ extern bool enable_nestloop; extern bool enable_material; extern bool enable_mergejoin; extern bool enable_hashjoin; +extern bool enable_gathermerge; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -198,5 +199,8 @@ extern Selectivity clause_selectivity(PlannerInfo *root, int varRelid, JoinType jointype, SpecialJoinInfo *sjinfo); +extern void cost_gather_merge(GatherMergePath *path, PlannerInfo *root, + RelOptInfo *rel, ParamPathInfo *param_info, + Cost input_startup_cost, Cost input_total_cost); #endif /* COST_H */ diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 71d9154..3dbe9fc 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -267,5 +267,10 @@ extern ParamPathInfo *get_joinrel_parampathinfo(PlannerInfo *root, List **restrict_clauses); extern ParamPathInfo *get_appendrel_parampathinfo(RelOptInfo *appendrel, Relids required_outer); +extern GatherMergePath *create_gather_merge_path(PlannerInfo *root, + RelOptInfo *rel, Path *subpath, + PathTarget *target, + List *pathkeys, + Relids required_outer); #endif /* PATHNODE_H */ diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out index f06cfa4..5c547e2 100644 --- a/src/test/regress/expected/rangefuncs.out +++ b/src/test/regress/expected/rangefuncs.out @@ -2,6 +2,7 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%'; name | setting ----------------------+--------- enable_bitmapscan | on + enable_gathermerge | on enable_hashagg | on enable_hashjoin | on enable_indexonlyscan | on @@ -12,7 +13,7 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(11 rows) +(12 rows) CREATE TABLE foo2(fooid int, f2 int); INSERT INTO foo2 VALUES(1, 11);