diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index e01fe6d..0b50ab9 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAppend.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" #include "executor/nodeSeqscan.h" @@ -201,6 +202,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecForeignScanEstimate((ForeignScanState *) planstate, e->pcxt); break; + case T_AppendState: + ExecAppendEstimate((AppendState *) planstate, + e->pcxt); + break; case T_CustomScanState: ExecCustomScanEstimate((CustomScanState *) planstate, e->pcxt); @@ -249,6 +254,10 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecSeqScanInitializeDSM((SeqScanState *) planstate, d->pcxt); break; + case T_AppendState: + ExecAppendInitializeDSM((AppendState *) planstate, + d->pcxt); + break; case T_ForeignScanState: ExecForeignScanInitializeDSM((ForeignScanState *) planstate, d->pcxt); @@ -725,6 +734,9 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) case T_SeqScanState: ExecSeqScanInitializeWorker((SeqScanState *) planstate, toc); break; + case T_AppendState: + ExecAppendInitializeWorker((AppendState *) planstate, toc); + break; case T_ForeignScanState: ExecForeignScanInitializeWorker((ForeignScanState *) planstate, toc); diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 6986cae..97bfc89 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -59,9 +59,48 @@ #include "executor/execdebug.h" #include "executor/nodeAppend.h" +#include "miscadmin.h" +#include "optimizer/cost.h" +#include "storage/spin.h" +/* + * Shared state for Parallel Append. + * + * Each backend participating in a Parallel Append has its own + * descriptor in backend-private memory, and those objects all contain + * a pointer to this structure. + */ +typedef struct ParallelAppendInfo +{ + int pa_num_workers; /* workers currently executing the subplan */ + int pa_max_workers; /* max workers that should run the subplan */ +} ParallelAppendInfo; + +typedef struct ParallelAppendDescData +{ + slock_t pa_mutex; /* mutual exclusion to choose next subplan */ + ParallelAppendInfo pa_info[FLEXIBLE_ARRAY_MEMBER]; +} ParallelAppendDescData; + +typedef ParallelAppendDescData *ParallelAppendDesc; + +/* + * Special value of AppendState->as_whichplan for Parallel Append, which + * indicates there are no plans left to be executed. + */ +#define PA_INVALID_PLAN -1 + + +static void exec_append_scan_first(AppendState *appendstate); static bool exec_append_initialize_next(AppendState *appendstate); +static void set_finished(ParallelAppendDesc padesc, int whichplan); +static bool parallel_append_next(AppendState *state); +static inline void +exec_append_scan_first(AppendState *appendstate) +{ + appendstate->as_whichplan = 0; +} /* ---------------------------------------------------------------- * exec_append_initialize_next @@ -77,6 +116,22 @@ exec_append_initialize_next(AppendState *appendstate) int whichplan; /* + * In case it's parallel-aware, follow it's own logic of choosing the next + * subplan. + */ + if (appendstate->as_padesc) + return parallel_append_next(appendstate); + + /* + * Not parallel-aware. Fine, just go on to the next subplan in the + * appropriate direction. + */ + if (ScanDirectionIsForward(appendstate->ps.state->es_direction)) + appendstate->as_whichplan++; + else + appendstate->as_whichplan--; + + /* * get information from the append node */ whichplan = appendstate->as_whichplan; @@ -178,8 +233,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* * initialize to scan first subplan */ - appendstate->as_whichplan = 0; - exec_append_initialize_next(appendstate); + exec_append_scan_first(appendstate); return appendstate; } @@ -198,6 +252,14 @@ ExecAppend(AppendState *node) PlanState *subnode; TupleTableSlot *result; + /* Check if we are already finished plans from parallel append */ + if (node->as_padesc && node->as_whichplan == PA_INVALID_PLAN) + { + elog(DEBUG2, "ParallelAppend : pid %d : all plans already finished", + MyProcPid); + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + /* * figure out which subplan we are currently processing */ @@ -219,14 +281,18 @@ ExecAppend(AppendState *node) } /* - * Go on to the "next" subplan in the appropriate direction. If no - * more subplans, return the empty slot set up for us by - * ExecInitAppend. + * We are done with this subplan. There might be other workers still + * processing the last chunk of rows for this same subplan, but there's + * no point for new workers to run this subplan, so mark this subplan + * as finished. + */ + if (node->as_padesc) + set_finished(node->as_padesc, node->as_whichplan); + + /* + * Go on to the "next" subplan. If no more subplans, return the empty + * slot set up for us by ExecInitAppend. */ - if (ScanDirectionIsForward(node->ps.state->es_direction)) - node->as_whichplan++; - else - node->as_whichplan--; if (!exec_append_initialize_next(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); @@ -270,6 +336,7 @@ ExecReScanAppend(AppendState *node) for (i = 0; i < node->as_nplans; i++) { PlanState *subnode = node->appendplans[i]; + ParallelAppendDesc padesc = node->as_padesc; /* * ExecReScan doesn't know about my subplans, so I have to do @@ -284,7 +351,223 @@ ExecReScanAppend(AppendState *node) */ if (subnode->chgParam == NULL) ExecReScan(subnode); + + if (padesc) + { + /* + * Just setting all the number of workers to 0 is enough. The logic + * of choosing the next plan will take care of everything else. + * pa_max_workers is already set initially. + */ + padesc->pa_info[i].pa_num_workers = 0; + } } - node->as_whichplan = 0; - exec_append_initialize_next(node); + + exec_append_scan_first(node); +} + +/* ---------------------------------------------------------------- + * Parallel Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendEstimate + * + * estimates the space required to serialize Append node. + * ---------------------------------------------------------------- + */ +void +ExecAppendEstimate(AppendState *node, + ParallelContext *pcxt) +{ + node->pappend_len = + add_size(offsetof(struct ParallelAppendDescData, pa_info), + sizeof(*node->as_padesc->pa_info) * node->as_nplans); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pappend_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + + +/* ---------------------------------------------------------------- + * ExecAppendInitializeDSM + * + * Set up a Parallel Append descriptor. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeDSM(AppendState *node, + ParallelContext *pcxt) +{ + ParallelAppendDesc padesc; + List *num_workers_list = ((Append*)node->ps.plan)->num_workers; + ListCell *lc; + int i; + + padesc = shm_toc_allocate(pcxt->toc, node->pappend_len); + SpinLockInit(&padesc->pa_mutex); + + Assert(node->as_nplans == list_length(num_workers_list)); + + i = 0; + foreach(lc, num_workers_list) + { + /* Initialize the max workers count for each subplan. */ + padesc->pa_info[i].pa_max_workers = lfirst_int(lc); + + /* + * Also, initialize current number of workers. Just setting all the + * number of workers to 0 is enough. The logic of choosing the next + * plan in workers will take care of initializing everything else. + */ + padesc->pa_info[i].pa_num_workers = 0; + + i++; + } + + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, padesc); + node->as_padesc = padesc; + + /* Choose the optimal subplan to be executed. */ + (void) parallel_append_next(node); +} + +/* ---------------------------------------------------------------- + * ExecAppendInitializeWorker + * + * Copy relevant information from TOC into planstate, and initialize + * whatever is required to choose and execute the optimal subplan. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeWorker(AppendState *node, shm_toc *toc) +{ + node->as_padesc = shm_toc_lookup(toc, node->ps.plan->plan_node_id); + + /* Choose the optimal subplan to be executed. */ + (void) parallel_append_next(node); +} + +/* ---------------------------------------------------------------- + * set_finished + * + * Indicate that this child plan node is about to be finished, so no other + * workers should take up this node. Workers who are already executing + * this node will continue to do so, but workers looking for next nodes to + * pick up would skip this node after this function is called. It is + * possible that multiple workers call this function for the same node at + * the same time, because these workers were executing the same node and + * they finished with it at the same time. The spinlock is not for this + * purpose. The spinlock is used so that it does not change the + * pa_num_workers field while workers are choosing the next node. + * ---------------------------------------------------------------- + */ +static void +set_finished(ParallelAppendDesc padesc, int whichplan) +{ + elog(DEBUG2, "Parallelappend : pid %d : finishing plan %d", + MyProcPid, whichplan); + + SpinLockAcquire(&padesc->pa_mutex); + padesc->pa_info[whichplan].pa_num_workers = -1; + SpinLockRelease(&padesc->pa_mutex); +} + +/* ---------------------------------------------------------------- + * parallel_append_next + * + * Determine the optimal subplan that should be executed. The logic is to + * choose the subplan that is being executed by the least number of + * workers. + * + * Returns false if and only if all subplans are already finished + * processing. + * ---------------------------------------------------------------- + */ +static bool +parallel_append_next(AppendState *state) +{ + ParallelAppendDesc padesc = state->as_padesc; + int whichplan; + int min_whichplan = PA_INVALID_PLAN; + int min_workers = -1; /* Keep compiler quiet */ + + Assert(padesc != NULL); + + SpinLockAcquire(&padesc->pa_mutex); + + /* Choose the plan with the least number of workers */ + for (whichplan = 0; whichplan < state->as_nplans; whichplan++) + { + ParallelAppendInfo *painfo = &padesc->pa_info[whichplan]; + + /* Ignore plans that are already done processing */ + if (painfo->pa_num_workers == -1) + { + elog(DEBUG2, "ParallelAppend : pid %d : ignoring plan %d" + " since pa_num_workers is -1", + MyProcPid, whichplan); + continue; + } + + /* Ignore plans that are already being processed by max_workers */ + if (painfo->pa_num_workers == painfo->pa_max_workers) + { + elog(DEBUG2, "ParallelAppend : pid %d : ignoring plan %d," + " since reached max_worker count %d", + MyProcPid, whichplan, painfo->pa_max_workers); + continue; + } + + /* + * Keep track of the node with the least workers so far. For the very + * first plan, choose that one as the least-workers node. + */ + if (min_whichplan == PA_INVALID_PLAN || + painfo->pa_num_workers < min_workers) + { + min_whichplan = whichplan; + min_workers = painfo->pa_num_workers; + } + } + + /* Increment worker count for the chosen node, if at all we found one. */ + if (min_whichplan != PA_INVALID_PLAN) + padesc->pa_info[min_whichplan].pa_num_workers++; + + /* + * Save the chosen plan index. It can be PA_INVALID_PLAN, which means we + * are done with all nodes (Note : this meaning applies only to *parallel* + * append). + */ + state->as_whichplan = min_whichplan; + + /* + * Note: There is a chance that just after the child plan node is chosen + * here and spinlock released, some other worker finishes this node and + * calls set_finished(). In that case, this worker will go ahead and call + * ExecProcNode(child_node), which will return NULL tuple since it is + * already finished, and then once again this worker will try to choose + * next subplan; but this is ok : it's just an extra "choose_next_subplan" + * operation. + */ + SpinLockRelease(&padesc->pa_mutex); + elog(DEBUG2, "ParallelAppend : pid %d : Chosen plan : %d", + MyProcPid, min_whichplan); + + /* + * If we didn't find any node to work on, it means each subplan is either + * finished or has reached it's pa_max_workers. In such case, should this + * worker wait for some subplan to have its worker count drop below its + * pa_max_workers so that it can choose that subplan ? It turns out that + * it's not worth again finding a subplan to work on. Non-partial subplan + * anyway can have only one worker, and that worker will execute it to + * completion. For a partial subplan, if at all it reaches pa_max_workers, + * it's worker count will reduce only when it's workers find that there is + * nothing more to be executed, so there is no point taking up such node if + * it's worker count reduces. In conclusion, just stop executing once we + * don't find nodes to work on. Indicate the same by returning false. + */ + return (min_whichplan == PA_INVALID_PLAN ? false : true); } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 30d733e..cf8d7d1 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -236,6 +236,7 @@ _copyAppend(const Append *from) * copy remainder of node */ COPY_NODE_FIELD(appendplans); + COPY_NODE_FIELD(num_workers); return newnode; } diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 1560ac3..38e13e0 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -369,6 +369,7 @@ _outAppend(StringInfo str, const Append *node) _outPlanInfo(str, (const Plan *) node); WRITE_NODE_FIELD(appendplans); + WRITE_NODE_FIELD(num_workers); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index dcfa6ee..8d0cda4 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1539,6 +1539,7 @@ _readAppend(void) ReadCommonPlan(&local_node->plan); READ_NODE_FIELD(appendplans); + READ_NODE_FIELD(num_workers); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 5c18987..c85271f 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -98,7 +98,8 @@ static void generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel, static Path *get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); -static List *accumulate_append_subpath(List *subpaths, Path *path); +static List *accumulate_append_subpath(List *subpaths, Path *path, + Bitmapset **partial_subpaths_set); static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel, @@ -1173,6 +1174,7 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, bool subpaths_valid = true; List *partial_subpaths = NIL; bool partial_subpaths_valid = true; + Bitmapset *partial_subpath_set = NULL; List *all_child_pathkeys = NIL; List *all_child_outers = NIL; ListCell *l; @@ -1232,14 +1234,52 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, */ if (childrel->cheapest_total_path->param_info == NULL) subpaths = accumulate_append_subpath(subpaths, - childrel->cheapest_total_path); + childrel->cheapest_total_path, + NULL); else subpaths_valid = false; /* Same idea, but for a partial plan. */ if (childrel->partial_pathlist != NIL) + { partial_subpaths = accumulate_append_subpath(partial_subpaths, - linitial(childrel->partial_pathlist)); + linitial(childrel->partial_pathlist), + &partial_subpath_set); + } + else if (enable_parallelappend) + { + /* + * Extract the first unparameterized, parallel-safe one among the + * child paths. + */ + Path *parallel_safe_path = NULL; + foreach(lcp, childrel->pathlist) + { + Path *child_path = (Path *) lfirst(lcp); + if (child_path->parallel_safe && + child_path->param_info == NULL) + { + parallel_safe_path = child_path; + break; + } + } + + /* If we got one parallel-safe path, add it */ + if (parallel_safe_path) + { + partial_subpaths = + accumulate_append_subpath(partial_subpaths, + parallel_safe_path, NULL); + } + else + { + /* + * This child rel neither has a partial path, nor has a + * parallel-safe path. So drop the idea for partial append path. + */ + partial_subpaths_valid = false; + } + } else partial_subpaths_valid = false; @@ -1314,7 +1354,8 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, * if we have zero or one live subpath due to constraint exclusion.) */ if (subpaths_valid) - add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0)); + add_path(rel, (Path *) create_append_path(rel, subpaths, + NULL, NULL, 0)); /* * Consider an append of partial unordered, unparameterized partial paths. @@ -1322,26 +1363,15 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, if (partial_subpaths_valid) { AppendPath *appendpath; - ListCell *lc; - int parallel_workers = 0; - - /* - * Decide on the number of workers to request for this append path. - * For now, we just use the maximum value from among the members. It - * might be useful to use a higher number if the Append node were - * smart enough to spread out the workers, but it currently isn't. - */ - foreach(lc, partial_subpaths) - { - Path *path = lfirst(lc); + int parallel_workers; - parallel_workers = Max(parallel_workers, path->parallel_workers); - } - Assert(parallel_workers > 0); + parallel_workers = get_append_num_workers(partial_subpaths, + partial_subpath_set, + NULL); - /* Generate a partial append path. */ - appendpath = create_append_path(rel, partial_subpaths, NULL, - parallel_workers); + appendpath = create_append_path(rel, partial_subpaths, + partial_subpath_set, + NULL, parallel_workers); add_partial_path(rel, (Path *) appendpath); } @@ -1388,12 +1418,13 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, subpaths_valid = false; break; } - subpaths = accumulate_append_subpath(subpaths, subpath); + subpaths = accumulate_append_subpath(subpaths, subpath, NULL); } if (subpaths_valid) add_path(rel, (Path *) - create_append_path(rel, subpaths, required_outer, 0)); + create_append_path(rel, subpaths, + NULL, required_outer, 0)); } } @@ -1475,9 +1506,11 @@ generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel, startup_neq_total = true; startup_subpaths = - accumulate_append_subpath(startup_subpaths, cheapest_startup); + accumulate_append_subpath(startup_subpaths, + cheapest_startup, NULL); total_subpaths = - accumulate_append_subpath(total_subpaths, cheapest_total); + accumulate_append_subpath(total_subpaths, + cheapest_total, NULL); } /* ... and build the MergeAppend paths */ @@ -1568,6 +1601,43 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, return cheapest; } +/* concat_append_subpaths + * helper function for accumulate_append_subpath() + * + * child_partial_subpaths_set is the bitmap set to indicate which of the + * childpaths are partial paths. This is currently non-NULL only in case + * the childpaths belong to an Append path. + */ +static List * +concat_append_subpaths(List *append_subpaths, List *childpaths, + Bitmapset **partial_subpaths_set, + Bitmapset *child_partial_subpaths_set) +{ + int i; + int append_subpath_len = list_length(append_subpaths); + + if (partial_subpaths_set) + { + for (i = 0; i < list_length(childpaths); i++) + { + /* + * The child paths themselves may or may not be partial paths. The + * bitmapset numbers of these paths will need to be set considering + * that these are to be appended onto the partial_subpaths_set. + */ + if (!child_partial_subpaths_set || + bms_is_member(i, child_partial_subpaths_set)) + { + *partial_subpaths_set = bms_add_member(*partial_subpaths_set, + append_subpath_len + i); + } + } + } + + /* list_copy is important here to avoid sharing list substructure */ + return list_concat(append_subpaths, list_copy(childpaths)); +} + /* * accumulate_append_subpath * Add a subpath to the list being built for an Append or MergeAppend @@ -1581,26 +1651,34 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, * omitting a sort step, which seems fine: if the parent is to be an Append, * its result would be unsorted anyway, while if the parent is to be a * MergeAppend, there's no point in a separate sort on a child. + * + * If partial_subpaths_set is not NULL, it means we are building a + * partial subpaths list, and so we need to add the path (or its child paths + * in case it's Append or MergeAppend) into the partial_subpaths bitmap set. */ static List * -accumulate_append_subpath(List *subpaths, Path *path) +accumulate_append_subpath(List *append_subpaths, Path *path, + Bitmapset **partial_subpaths_set) { if (IsA(path, AppendPath)) { - AppendPath *apath = (AppendPath *) path; - - /* list_copy is important here to avoid sharing list substructure */ - return list_concat(subpaths, list_copy(apath->subpaths)); + return concat_append_subpaths(append_subpaths, + ((AppendPath*)path)->subpaths, + partial_subpaths_set, + ((AppendPath*)path)->partial_subpaths); } else if (IsA(path, MergeAppendPath)) { - MergeAppendPath *mpath = (MergeAppendPath *) path; - - /* list_copy is important here to avoid sharing list substructure */ - return list_concat(subpaths, list_copy(mpath->subpaths)); + return concat_append_subpaths(append_subpaths, + ((MergeAppendPath*)path)->subpaths, + partial_subpaths_set, + NULL); } else - return lappend(subpaths, path); + return concat_append_subpaths(append_subpaths, + list_make1(path), + partial_subpaths_set, + NULL); } /* @@ -1623,7 +1701,7 @@ set_dummy_rel_pathlist(RelOptInfo *rel) rel->pathlist = NIL; rel->partial_pathlist = NIL; - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0)); + add_path(rel, (Path *) create_append_path(rel, NIL, NULL, NULL, 0)); /* * We set the cheapest path immediately, to ensure that IS_DUMMY_REL() diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 458f139..974d12d 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -126,6 +126,7 @@ bool enable_nestloop = true; bool enable_material = true; bool enable_mergejoin = true; bool enable_hashjoin = true; +bool enable_parallelappend = true; typedef struct { @@ -1552,6 +1553,82 @@ cost_sort(Path *path, PlannerInfo *root, } /* + * cost_append + * Determines and returns the cost of an Append node. + * + * We charge nothing extra for the Append itself, which perhaps is too + * optimistic, but since it doesn't do any selection or projection, it is a + * pretty cheap node. + */ +void +cost_append(Path *path, List *subpaths, Relids required_outer) +{ + ListCell *l; + + path->rows = 0; + path->startup_cost = 0; + path->total_cost = 0; + + if (path->parallel_aware) + { + int parallel_divisor; + + foreach(l, subpaths) + { + Path *subpath = (Path *) lfirst(l); + + /* + * The subpath rows and cost is per worker. We need total count + * of each of the subpaths, so that we can determine the total cost + * of Append. + */ + parallel_divisor = get_parallel_divisor(subpath); + path->rows += (subpath->rows * parallel_divisor); + path->total_cost += (subpath->total_cost * parallel_divisor); + + /* + * Append would start returning tuples when the child node having + * lowest startup cost is done setting up. + */ + path->startup_cost = Min(path->startup_cost, + subpath->startup_cost); + + path->parallel_safe = path->parallel_safe && + subpath->parallel_safe; + + /* All child paths must have same parameterization */ + Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); + } + + /* The row count and cost should represent per-worker figures. */ + parallel_divisor = get_parallel_divisor(path); + path->rows = clamp_row_est(path->rows / parallel_divisor); + path->total_cost /= parallel_divisor; + + } + else + { + /* Compute rows and costs as sums of subplan rows and costs. */ + foreach(l, subpaths) + { + Path *subpath = (Path *) lfirst(l); + + path->rows += subpath->rows; + + path->total_cost += subpath->total_cost; + if (l == list_head(subpaths)) /* first node? */ + path->startup_cost = subpath->startup_cost; + + path->parallel_safe = path->parallel_safe && + subpath->parallel_safe; + + /* All child paths must have same parameterization */ + Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); + } + } +} + +/* * cost_merge_append * Determines and returns the cost of a MergeAppend node. * diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c index 6f3c20b..37e755b 100644 --- a/src/backend/optimizer/path/joinrels.c +++ b/src/backend/optimizer/path/joinrels.c @@ -1197,7 +1197,7 @@ mark_dummy_rel(RelOptInfo *rel) rel->partial_pathlist = NIL; /* Set up the dummy path */ - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0)); + add_path(rel, (Path *) create_append_path(rel, NIL, NULL, NULL, 0)); /* Set or update cheapest_total_path and related fields */ set_cheapest(rel); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index fae1f67..b25d53c 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -29,6 +29,7 @@ #include "nodes/nodeFuncs.h" #include "optimizer/clauses.h" #include "optimizer/cost.h" +#include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/placeholder.h" #include "optimizer/plancat.h" @@ -194,7 +195,7 @@ static CteScan *make_ctescan(List *qptlist, List *qpqual, Index scanrelid, int ctePlanId, int cteParam); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); -static Append *make_append(List *appendplans, List *tlist); +static Append *make_append(List *appendplans, List *num_workers, List *tlist); static RecursiveUnion *make_recursive_union(List *tlist, Plan *lefttree, Plan *righttree, @@ -962,6 +963,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) Append *plan; List *tlist = build_path_tlist(root, &best_path->path); List *subplans = NIL; + List *num_workers_list; ListCell *subpaths; /* @@ -1000,6 +1002,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) subplans = lappend(subplans, subplan); } + /* Get a list of number of workers for each of the subplans */ + (void) get_append_num_workers(best_path->subpaths, + best_path->partial_subpaths, + &num_workers_list); + /* * XXX ideally, if there's just one child, we'd not bother to generate an * Append node but just return the single child. At the moment this does @@ -1007,7 +1014,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) * parent-rel Vars it'll be asked to emit. */ - plan = make_append(subplans, tlist); + plan = make_append(subplans, num_workers_list, tlist); copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -5009,7 +5016,7 @@ make_foreignscan(List *qptlist, } static Append * -make_append(List *appendplans, List *tlist) +make_append(List *appendplans, List *num_workers, List *tlist) { Append *node = makeNode(Append); Plan *plan = &node->plan; @@ -5019,6 +5026,7 @@ make_append(List *appendplans, List *tlist) plan->lefttree = NULL; plan->righttree = NULL; node->appendplans = appendplans; + node->num_workers = num_workers; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 4b5902f..d397d1f 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -3394,10 +3394,7 @@ create_grouping_paths(PlannerInfo *root, paths = lappend(paths, path); } path = (Path *) - create_append_path(grouped_rel, - paths, - NULL, - 0); + create_append_path(grouped_rel, paths, NULL, NULL, 0); path->pathtarget = target; } else diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 06e843d..847c4b9 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -566,7 +566,7 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0); + path = (Path *) create_append_path(result_rel, pathlist, NULL, NULL, 0); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); @@ -678,7 +678,7 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0); + path = (Path *) create_append_path(result_rel, pathlist, NULL, NULL, 0); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index f440875..e2ead44 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1195,6 +1195,49 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, } /* + * get_append_num_workers + * Return the number of workers to request for partial append path. + * Optionally return the list of per-subplan worker count through + * 'per_subplan_workers' + */ +int +get_append_num_workers(List *subpaths, Bitmapset *partial_subpaths_set, + List **per_subplan_workers) +{ + ListCell *lc; + int total_workers = 0; + int subplan_workers; + int i = 0; + + if (per_subplan_workers) + *per_subplan_workers = NIL; + + foreach(lc, subpaths) + { + Path *subpath = lfirst(lc); + + if (bms_is_member(i, partial_subpaths_set)) + subplan_workers = subpath->parallel_workers; + else + subplan_workers = 1; + + if (per_subplan_workers) + { + *per_subplan_workers = + lappend_int(*per_subplan_workers, subplan_workers); + } + total_workers += subplan_workers; + i++; + } + + /* In no case use more than max_parallel_workers_per_gather. */ + total_workers = Min(total_workers, + max_parallel_workers_per_gather); + + return total_workers; +} + +/* * create_append_path * Creates a path corresponding to an Append plan, returning the * pathnode. @@ -1202,50 +1245,28 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, * Note that we must handle subpaths = NIL, representing a dummy access path. */ AppendPath * -create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, - int parallel_workers) +create_append_path(RelOptInfo *rel, + List *subpaths, Bitmapset *partial_subpaths, + Relids required_outer, int parallel_workers) { AppendPath *pathnode = makeNode(AppendPath); - ListCell *l; pathnode->path.pathtype = T_Append; pathnode->path.parent = rel; pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_appendrel_parampathinfo(rel, required_outer); - pathnode->path.parallel_aware = false; + pathnode->path.parallel_aware = + (enable_parallelappend && parallel_workers > 0); pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_workers = parallel_workers; + pathnode->path.pathkeys = NIL; /* result is always considered * unsorted */ pathnode->subpaths = subpaths; + pathnode->partial_subpaths = partial_subpaths; - /* - * We don't bother with inventing a cost_append(), but just do it here. - * - * Compute rows and costs as sums of subplan rows and costs. We charge - * nothing extra for the Append itself, which perhaps is too optimistic, - * but since it doesn't do any selection or projection, it is a pretty - * cheap node. - */ - pathnode->path.rows = 0; - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; - foreach(l, subpaths) - { - Path *subpath = (Path *) lfirst(l); - - pathnode->path.rows += subpath->rows; - - if (l == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; - pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; - - /* All child paths must have same parameterization */ - Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); - } + cost_append(&pathnode->path, subpaths, required_outer); return pathnode; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 74ca4e7..3588f09 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -895,6 +895,16 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_parallelappend", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of parallel append plans."), + NULL + }, + &enable_parallelappend, + true, + NULL, NULL, NULL + }, + { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index 6fb4662..e76027f 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -14,11 +14,15 @@ #ifndef NODEAPPEND_H #define NODEAPPEND_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags); extern TupleTableSlot *ExecAppend(AppendState *node); extern void ExecEndAppend(AppendState *node); extern void ExecReScanAppend(AppendState *node); +extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeWorker(AppendState *node, shm_toc *toc); #endif /* NODEAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index f9bcdd6..a21b16d 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -21,6 +21,7 @@ #include "lib/pairingheap.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/spin.h" #include "utils/hsearch.h" #include "utils/reltrigger.h" #include "utils/sortsupport.h" @@ -1180,12 +1181,15 @@ typedef struct ModifyTableState * whichplan which plan is being executed (0 .. n-1) * ---------------- */ +struct ParallelAppendDescData; typedef struct AppendState { PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; + struct ParallelAppendDescData *as_padesc; /* parallel coordination info */ + Size pappend_len; /* size of parallel coordination info */ } AppendState; /* ---------------- diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index f72f7a8..8c06ee0 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -228,6 +228,7 @@ typedef struct Append { Plan plan; List *appendplans; + List *num_workers; } Append; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 643be54..ac0ff70 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1116,6 +1116,7 @@ typedef struct AppendPath { Path path; List *subpaths; /* list of component Paths */ + Bitmapset *partial_subpaths; } AppendPath; #define IS_DUMMY_PATH(p) \ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 39376ec..ecbda74 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -66,6 +66,7 @@ extern bool enable_nestloop; extern bool enable_material; extern bool enable_mergejoin; extern bool enable_hashjoin; +extern bool enable_parallelappend; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -98,6 +99,7 @@ extern void cost_sort(Path *path, PlannerInfo *root, List *pathkeys, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples); +extern void cost_append(Path *path, List *subpaths, Relids required_outer); extern void cost_merge_append(Path *path, PlannerInfo *root, List *pathkeys, int n_streams, Cost input_startup_cost, Cost input_total_cost, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 7b41317..425d7b9 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -14,6 +14,7 @@ #ifndef PATHNODE_H #define PATHNODE_H +#include "nodes/bitmapset.h" #include "nodes/relation.h" @@ -61,8 +62,11 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root, List *bitmapquals); extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, Relids required_outer); -extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths, - Relids required_outer, int parallel_workers); +extern int get_append_num_workers(List *subpaths, + Bitmapset *partial_subpaths_set, List **per_subplan_workers); +extern AppendPath *create_append_path(RelOptInfo *rel, + List *subpaths, Bitmapset *partial_subpaths, + Relids required_outer, int parallel_workers); extern MergeAppendPath *create_merge_append_path(PlannerInfo *root, RelOptInfo *rel, List *subpaths, diff --git a/src/test/regress/expected/rangefuncs.out b/src/test/regress/expected/rangefuncs.out index 56481de..92439fe 100644 --- a/src/test/regress/expected/rangefuncs.out +++ b/src/test/regress/expected/rangefuncs.out @@ -1,18 +1,19 @@ SELECT name, setting FROM pg_settings WHERE name LIKE 'enable%'; - name | setting -----------------------+--------- - enable_bitmapscan | on - enable_hashagg | on - enable_hashjoin | on - enable_indexonlyscan | on - enable_indexscan | on - enable_material | on - enable_mergejoin | on - enable_nestloop | on - enable_seqscan | on - enable_sort | on - enable_tidscan | on -(11 rows) + name | setting +-----------------------+--------- + enable_bitmapscan | on + enable_hashagg | on + enable_hashjoin | on + enable_indexonlyscan | on + enable_indexscan | on + enable_material | on + enable_mergejoin | on + enable_nestloop | on + enable_parallelappend | on + enable_seqscan | on + enable_sort | on + enable_tidscan | on +(12 rows) CREATE TABLE foo2(fooid int, f2 int); INSERT INTO foo2 VALUES(1, 11); diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 18e21b7..f6c4b41 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -17,9 +17,9 @@ explain (costs off) ----------------------------------------------------- Finalize Aggregate -> Gather - Workers Planned: 1 + Workers Planned: 4 -> Partial Aggregate - -> Append + -> Parallel Append -> Parallel Seq Scan on a_star -> Parallel Seq Scan on b_star -> Parallel Seq Scan on c_star