diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index aeda826..c6044e1 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3632,6 +3632,20 @@ ANY num_sync ( + enable_parallel_append (boolean) + + enable_parallel_append configuration parameter + + + + + Enables or disables the query planner's use of parallel-aware + append plan types. The default is on. + + + + enable_partition_wise_join (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 6f82033..12a8635 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1117,7 +1117,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for TBM shared iterator lock. - Lock + parallel_append + Waiting to choose the next subplan during Parallel Append plan + execution. + + + Lock relation Waiting to acquire a lock on a relation. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 1b477ba..1445dd4 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" @@ -244,6 +245,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecForeignScanEstimate((ForeignScanState *) planstate, e->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendEstimate((AppendState *) planstate, + e->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanEstimate((CustomScanState *) planstate, @@ -316,6 +322,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecForeignScanInitializeDSM((ForeignScanState *) planstate, d->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeDSM((AppendState *) planstate, + d->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeDSM((CustomScanState *) planstate, @@ -699,6 +710,10 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendReInitializeDSM((AppendState *) planstate, pcxt); + break; case T_SortState: /* even when not parallel-aware */ ExecSortReInitializeDSM((SortState *) planstate, pcxt); @@ -969,6 +984,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ExecForeignScanInitializeWorker((ForeignScanState *) planstate, toc); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeWorker((AppendState *) planstate, toc); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeWorker((CustomScanState *) planstate, diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index bed9bb8..fafcccf 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -60,10 +60,43 @@ #include "executor/execdebug.h" #include "executor/nodeAppend.h" #include "miscadmin.h" +#include "optimizer/cost.h" +#include "storage/spin.h" -static TupleTableSlot *ExecAppend(PlanState *pstate); -static bool exec_append_initialize_next(AppendState *appendstate); +/* + * Shared state for Parallel Append. + * + * Each backend participating in a Parallel Append has its own + * descriptor in backend-private memory, and those objects all contain + * a pointer to this structure. + */ +typedef struct ParallelAppendDescData +{ + LWLock pa_lock; /* mutual exclusion to choose next subplan */ + int pa_next_plan; /* next plan to choose by any worker */ + + /* + * pa_finished: workers currently executing the subplan. A worker which + * finishes a subplan should set pa_finished to true, so that no new worker + * picks this subplan. For non-partial subplan, a worker which picks up + * that subplan should immediately set to true, so as to make sure there + * are no more than 1 worker assigned to this subplan. + */ + bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; +} ParallelAppendDescData; +typedef ParallelAppendDescData *ParallelAppendDesc; + +/* + * Special value of AppendState->as_whichplan for Parallel Append, which + * indicates there are no plans left to be executed. + */ +#define PA_INVALID_PLAN -1 + +static TupleTableSlot *ExecAppend(PlanState *pstate); +static bool exec_append_seq_next(AppendState *appendstate); +static bool exec_append_parallel_next(AppendState *state); +static bool exec_append_leader_next(AppendState *state); /* ---------------------------------------------------------------- * exec_append_initialize_next @@ -74,11 +107,20 @@ static bool exec_append_initialize_next(AppendState *appendstate); * ---------------------------------------------------------------- */ static bool -exec_append_initialize_next(AppendState *appendstate) +exec_append_seq_next(AppendState *appendstate) { int whichplan; /* + * Not parallel-aware. Fine, just go on to the next subplan in the + * appropriate direction. + */ + if (ScanDirectionIsForward(appendstate->ps.state->es_direction)) + appendstate->as_whichplan++; + else + appendstate->as_whichplan--; + + /* * get information from the append node */ whichplan = appendstate->as_whichplan; @@ -185,10 +227,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->ps.ps_ProjInfo = NULL; /* - * initialize to scan first subplan + * Initialize to scan first subplan (but note that we'll override this + * later in the case of a parallel append). */ appendstate->as_whichplan = 0; - exec_append_initialize_next(appendstate); return appendstate; } @@ -204,6 +246,16 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + /* + * If this is the first time we are executing a Parallel Append node, + * we need to choose a subplan first. + */ + if (node->as_padesc && node->as_whichplan == PA_INVALID_PLAN) + { + if (!exec_append_parallel_next(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } + for (;;) { PlanState *subnode; @@ -232,16 +284,34 @@ ExecAppend(PlanState *pstate) } /* - * Go on to the "next" subplan in the appropriate direction. If no - * more subplans, return the empty slot set up for us by - * ExecInitAppend. + * Go on to the "next" subplan. If no more subplans, return the empty + * slot set up for us by ExecInitAppend. */ - if (ScanDirectionIsForward(node->ps.state->es_direction)) - node->as_whichplan++; + if (!node->as_padesc) + { + if (!exec_append_seq_next(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } else - node->as_whichplan--; - if (!exec_append_initialize_next(node)) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + { + /* + * Parallel-aware Append follows different logic for choosing the + * next subplan. + */ + + /* + * We are done with this subplan. There might be other workers + * still processing the last chunk of rows for this same subplan, + * but there's no point for new workers to run this subplan, so + * mark this subplan as finished. + */ + LWLockAcquire(&node->as_padesc->pa_lock, LW_EXCLUSIVE); + node->as_padesc->pa_finished[node->as_whichplan] = true; + LWLockRelease(&node->as_padesc->pa_lock); + + if (!exec_append_parallel_next(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } /* Else loop back and try to get a tuple from the new subplan */ } @@ -299,5 +369,265 @@ ExecReScanAppend(AppendState *node) ExecReScan(subnode); } node->as_whichplan = 0; - exec_append_initialize_next(node); +} + +/* ---------------------------------------------------------------- + * Parallel Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendEstimate + * + * estimates the space required to serialize Append node. + * ---------------------------------------------------------------- + */ +void +ExecAppendEstimate(AppendState *node, + ParallelContext *pcxt) +{ + node->pappend_len = + add_size(offsetof(struct ParallelAppendDescData, pa_finished), + sizeof(bool) * node->as_nplans); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pappend_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + + +/* ---------------------------------------------------------------- + * ExecAppendInitializeDSM + * + * Set up a Parallel Append descriptor. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeDSM(AppendState *node, + ParallelContext *pcxt) +{ + ParallelAppendDesc padesc; + + padesc = shm_toc_allocate(pcxt->toc, node->pappend_len); + + /* + * Just setting all the fields to 0 is enough. The logic of choosing the + * next plan in workers will take care of everything else. + */ + memset(padesc, 0, sizeof(ParallelAppendDescData)); + memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans); + + LWLockInitialize(&padesc->pa_lock, LWTRANCHE_PARALLEL_APPEND); + + node->as_padesc = padesc; + node->as_whichplan = PA_INVALID_PLAN; + + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, padesc); +} + +/* ---------------------------------------------------------------- + * ExecAppendReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) +{ + ParallelAppendDesc padesc = node->as_padesc; + + padesc->pa_next_plan = 0; + memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans); + node->as_whichplan = PA_INVALID_PLAN; +} + +/* ---------------------------------------------------------------- + * ExecAppendInitializeWorker + * + * Copy relevant information from TOC into planstate, and initialize + * whatever is required to choose and execute the optimal subplan. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeWorker(AppendState *node, shm_toc *toc) +{ + node->as_padesc = shm_toc_lookup(toc, node->ps.plan->plan_node_id, false); + node->as_whichplan = PA_INVALID_PLAN; + node->is_parallel_worker = true; +} + +/* ---------------------------------------------------------------- + * exec_append_parallel_next + * + * Determine the next subplan that should be executed. Each worker uses a + * shared field 'pa_next_plan' to start looking for unfinished plan, + * executes the subplan, then shifts ahead this field to the next + * subplan, so that other workers know which next plan to choose. This + * way, workers choose the subplans in round robin order, and thus they + * get evenly distributed among the subplans. + * + * Returns false if and only if all subplans are already finished + * processing. + * ---------------------------------------------------------------- + */ +static bool +exec_append_parallel_next(AppendState *state) +{ + ParallelAppendDesc padesc = state->as_padesc; + int whichplan; + int nextplan; + int initial_plan; + int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan; + bool found; + + Assert(padesc != NULL); + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(state->ps.state->es_direction)); + + /* The parallel leader chooses its next subplan differently */ + if (!state->is_parallel_worker) + return exec_append_leader_next(state); + + LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE); + + /* If all the plans are already done, we have nothing to do */ + if (padesc->pa_next_plan == PA_INVALID_PLAN) + { + LWLockRelease(&padesc->pa_lock); + return false; + } + + /* Make a note of which subplan we have started with */ + initial_plan = nextplan = padesc->pa_next_plan; + + /* + * Keep going to the next plan until we find an unfinished one or we made a + * full circle. Finished ones also include non-partial subplans which are + * already taken by a worker. + */ + do + { + whichplan = nextplan; + /* + * Either go to the next plan, or if we are at the last plan, wrap + * around to the first partial one. We don't have to go back to the + * non-partial plans. Due to the round-robin traversal, the fact that + * we are wrapping around means that all the non-partial plans are + * already taken. + */ + if (whichplan + 1 == state->as_nplans) + { + nextplan = first_partial_plan; + /* + * If we had started from a non-partial plan, that means we have + * searched all the nonpartial and partial plans. + */ + if (initial_plan <= first_partial_plan) + break; + } + else + { + nextplan = whichplan + 1; + + /* Have we made a full circle ? */ + if (nextplan == initial_plan) + break; + } + } while (padesc->pa_finished[whichplan]); + + /* + * Note: There is a chance that just after the child plan node is chosen + * above, some other worker finishes this node and sets pa_finished to + * true. In that case, this worker will go ahead and call + * ExecProcNode(child_node), which will return NULL tuple since it is + * already finished, and then once again this worker will try to choose + * next subplan; but this is ok : it's just an extra "choose_next_subplan" + * operation. + */ + + Assert(0 <= whichplan && whichplan < state->as_nplans); + found = !padesc->pa_finished[whichplan]; + + /* If we found no plans, indicate the same to other workers */ + if (!found) + state->as_whichplan = padesc->pa_next_plan = PA_INVALID_PLAN; + else + { + /* Set the chosen plan */ + state->as_whichplan = whichplan; + + /* If this a non-partial plan, immediately mark it finished */ + if (whichplan < first_partial_plan) + padesc->pa_finished[whichplan] = true; + + /* + * nextplan can be state->as_nplans if we wrapped around to the first + * partial plan but there were no partial plans. + */ + padesc->pa_next_plan = (nextplan == state->as_nplans ? + PA_INVALID_PLAN : nextplan); + } + + LWLockRelease(&padesc->pa_lock); + + return found; +} + +/* ---------------------------------------------------------------- + * exec_append_leader_next + * + * To be used only if it's a parallel leader. + * With more workers, the leader is known to do more work servicing the + * worker tuple queue, and less work contributing to parallel processing. + * Hence, it should not take expensive plans, otherwise it will affect the + * total time to finish Append. Since we have non-partial plans sorted in + * descending cost, let the leader scan backwards from the last plan, i.e. + * the cheapest plan. + * ---------------------------------------------------------------- + */ +static bool +exec_append_leader_next(AppendState *state) +{ + ParallelAppendDesc padesc = state->as_padesc; + int whichplan; + int worker_next_plan; + bool found = false; + + LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE); + + worker_next_plan = padesc->pa_next_plan; + + if (worker_next_plan != PA_INVALID_PLAN) + { + int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan; + bool nonpartial_plans_finished = (worker_next_plan >= first_partial_plan); + + /* The parallel leader should start from the last subplan. */ + for (whichplan = state->as_nplans - 1; whichplan >= 0; whichplan--) + { + if (!padesc->pa_finished[whichplan]) + { + found = true; + /* If this a non-partial plan, immediately mark it finished */ + if (whichplan < first_partial_plan) + padesc->pa_finished[whichplan] = true; + break; + } + + /* + * If we are into non-partial plans but they are already done, no + * point in going back further. + */ + if (whichplan < first_partial_plan && nonpartial_plans_finished) + break; + } + + } + + LWLockRelease(&padesc->pa_lock); + + state->as_whichplan = (found ? whichplan : PA_INVALID_PLAN); + + /* Return false only if we didn't find any plan to execute */ + return found; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index c1a83ca..5852484 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -242,6 +242,7 @@ _copyAppend(const Append *from) */ COPY_NODE_FIELD(partitioned_rels); COPY_NODE_FIELD(appendplans); + COPY_SCALAR_FIELD(first_partial_plan); return newnode; } diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index acaf4b5..75761a9 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -1250,6 +1250,45 @@ list_copy_tail(const List *oldlist, int nskip) } /* + * Sort a list using qsort. A sorted list is built but the cells of the original + * list are re-used. Caller has to pass a copy of the list if the original list + * needs to be untouched. Effectively, the comparator function is passed + * pointers to ListCell* pointers. + */ +List * +list_qsort(const List *list, list_qsort_comparator cmp) +{ + ListCell *cell; + int i; + int len = list_length(list); + ListCell **list_arr; + List *new_list; + + if (len == 0) + return NIL; + + i = 0; + list_arr = palloc(sizeof(ListCell *) * len); + foreach(cell, list) + list_arr[i++] = cell; + + qsort(list_arr, len, sizeof(ListCell *), cmp); + + new_list = (List *) palloc(sizeof(List)); + new_list->type = T_List; + new_list->length = len; + new_list->head = list_arr[0]; + new_list->tail = list_arr[len-1]; + + for (i = 0; i < len-1; i++) + list_arr[i]->next = list_arr[i+1]; + + list_arr[len-1]->next = NULL; + pfree(list_arr); + return new_list; +} + +/* * Temporary compatibility functions * * In order to avoid warnings for these function definitions, we need diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 43d6206..3236d58 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -399,6 +399,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(partitioned_rels); WRITE_NODE_FIELD(appendplans); + WRITE_INT_FIELD(first_partial_plan); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index ccb6a1f..23fcc1b 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1600,6 +1600,7 @@ _readAppend(void) READ_NODE_FIELD(partitioned_rels); READ_NODE_FIELD(appendplans); + READ_INT_FIELD(first_partial_plan); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 5535b63..902ddc1 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -102,6 +102,9 @@ static Path *get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); static List *accumulate_append_subpath(List *subpaths, Path *path); +static List *accumulate_partialappend_subpath(List *partial_subpaths, + Path *subpath, bool is_partial, + List **nonpartial_subpaths); static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel, @@ -1310,7 +1313,10 @@ set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, * non-dummy children. For every such parameterization or ordering, it creates * an append path collecting one path from each non-dummy child with given * parameterization or ordering. Similarly it collects partial paths from - * non-dummy children to create partial append paths. + * non-dummy children to create partial append paths. Furthermore, it creates + * a parallel-aware partial Append path that can contain a mix of partial and + * non-partial paths of its children. + * */ static void add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, @@ -1319,7 +1325,10 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, List *subpaths = NIL; bool subpaths_valid = true; List *partial_subpaths = NIL; + List *pa_partial_subpaths = NIL; + List *pa_nonpartial_subpaths = NIL; bool partial_subpaths_valid = true; + bool pa_subpaths_valid = enable_parallel_append; List *all_child_pathkeys = NIL; List *all_child_outers = NIL; ListCell *l; @@ -1401,7 +1410,62 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, else subpaths_valid = false; - /* Same idea, but for a partial plan. */ + /* Same idea, but for a parallel append path. */ + if (pa_subpaths_valid && enable_parallel_append) + { + Path *chosen_path = NULL; + Path *cheapest_partial_path = NULL; + Path *cheapest_parallel_safe_path = NULL; + + /* + * Extract the cheapest unparameterized, parallel-safe one among + * the child paths. + */ + cheapest_parallel_safe_path = + get_cheapest_parallel_safe_total_inner(childrel->pathlist); + + /* Get the cheapest partial path */ + if (childrel->partial_pathlist != NIL) + cheapest_partial_path = linitial(childrel->partial_pathlist); + + if (!cheapest_parallel_safe_path && !cheapest_partial_path) + { + /* + * This child rel neither has a partial path, nor has a + * parallel-safe path. Drop the idea for parallel append. + */ + pa_subpaths_valid = false; + } + else if (cheapest_partial_path && cheapest_parallel_safe_path) + { + /* Both are valid. Choose the cheaper out of the two */ + if (cheapest_parallel_safe_path->total_cost < + cheapest_partial_path->total_cost) + chosen_path = cheapest_parallel_safe_path; + else + chosen_path = cheapest_partial_path; + } + else + { + /* Either one is valid. Choose the valid one */ + chosen_path = cheapest_partial_path ? + cheapest_partial_path : + cheapest_parallel_safe_path; + } + + /* If we got a valid path, add it */ + if (chosen_path) + { + pa_partial_subpaths = + accumulate_partialappend_subpath( + pa_partial_subpaths, + chosen_path, + chosen_path == cheapest_partial_path, + &pa_nonpartial_subpaths); + } + } + + /* Same idea, but for a non-parallel partial plan. */ if (childrel->partial_pathlist != NIL) partial_subpaths = accumulate_append_subpath(partial_subpaths, linitial(childrel->partial_pathlist)); @@ -1479,35 +1543,47 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, * if we have zero or one live subpath due to constraint exclusion.) */ if (subpaths_valid) - add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0, + add_path(rel, (Path *) create_append_path(rel, subpaths, NIL, + NULL, 0, false, partitioned_rels)); + /* Consider parallel append path. */ + if (pa_subpaths_valid) + { + AppendPath *appendpath; + int parallel_workers; + + parallel_workers = get_append_num_workers(pa_partial_subpaths, + pa_nonpartial_subpaths, + true); + appendpath = create_append_path(rel, pa_nonpartial_subpaths, + pa_partial_subpaths, + NULL, parallel_workers, true, + partitioned_rels); + add_partial_path(rel, (Path *) appendpath); + } + /* - * Consider an append of partial unordered, unparameterized partial paths. + * If parallel append has not been added above, or the added one has a mix + * of partial and non-partial subpaths, then consider another Parallel + * Append path which will have *all* partial subpaths. We can add such a + * path only if all childrels have partial paths in the first place. This + * new path will be parallel-aware unless enable_parallel_append is off. */ - if (partial_subpaths_valid) + if (partial_subpaths_valid && + (!pa_subpaths_valid || pa_nonpartial_subpaths != NIL)) { AppendPath *appendpath; - ListCell *lc; - int parallel_workers = 0; - - /* - * Decide on the number of workers to request for this append path. - * For now, we just use the maximum value from among the members. It - * might be useful to use a higher number if the Append node were - * smart enough to spread out the workers, but it currently isn't. - */ - foreach(lc, partial_subpaths) - { - Path *path = lfirst(lc); + int parallel_workers; - parallel_workers = Max(parallel_workers, path->parallel_workers); - } - Assert(parallel_workers > 0); + parallel_workers = get_append_num_workers(partial_subpaths, + NIL, + enable_parallel_append); + appendpath = create_append_path(rel, NIL, partial_subpaths, + NULL, parallel_workers, + enable_parallel_append, + partitioned_rels); - /* Generate a partial append path. */ - appendpath = create_append_path(rel, partial_subpaths, NULL, - parallel_workers, partitioned_rels); add_partial_path(rel, (Path *) appendpath); } @@ -1560,7 +1636,8 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, if (subpaths_valid) add_path(rel, (Path *) - create_append_path(rel, subpaths, required_outer, 0, + create_append_path(rel, subpaths, NIL, + required_outer, 0, false, partitioned_rels)); } } @@ -1778,6 +1855,69 @@ accumulate_append_subpath(List *subpaths, Path *path) } /* + * accumulate_partialappend_subpath: + * Add a subpath to the list being built for a partial Append. + * + * This is same as accumulate_append_subpath, except that two separate lists + * are created, one containing only partial subpaths, and the other containing + * only non-partial subpaths. Also, the non-partial paths are kept ordered + * by descending total cost. + * + * is_partial is true if the subpath being added is a partial subpath. + */ +static List * +accumulate_partialappend_subpath(List *partial_subpaths, + Path *subpath, bool is_partial, + List **nonpartial_subpaths) +{ + /* list_copy is important here to avoid sharing list substructure */ + + if (IsA(subpath, AppendPath)) + { + AppendPath *apath = (AppendPath *) subpath; + List *apath_partial_paths; + List *apath_nonpartial_paths; + + /* Split the Append subpaths into partial and non-partial paths */ + apath_nonpartial_paths = list_truncate(list_copy(apath->subpaths), + apath->first_partial_path); + apath_partial_paths = list_copy_tail(apath->subpaths, + apath->first_partial_path); + + /* Add non-partial subpaths, if any. */ + *nonpartial_subpaths = list_concat(*nonpartial_subpaths, + list_copy(apath_nonpartial_paths)); + + /* Add partial subpaths, if any. */ + partial_subpaths = list_concat(partial_subpaths, apath_partial_paths); + } + else if (IsA(subpath, MergeAppendPath)) + { + MergeAppendPath *mpath = (MergeAppendPath *) subpath; + + /* We don't create partial MergeAppend path */ + Assert(!is_partial); + + /* + * Since MergePath itself is non-partial, treat all its subpaths + * non-partial. + */ + *nonpartial_subpaths = list_concat(*nonpartial_subpaths, + list_copy(mpath->subpaths)); + } + else + { + /* Just add it to the right list depending upon whether it's partial */ + if (is_partial) + partial_subpaths = lappend(partial_subpaths, subpath); + else + *nonpartial_subpaths = lappend(*nonpartial_subpaths, subpath); + } + + return partial_subpaths; +} + +/* * set_dummy_rel_pathlist * Build a dummy path for a relation that's been excluded by constraints * @@ -1797,7 +1937,8 @@ set_dummy_rel_pathlist(RelOptInfo *rel) rel->pathlist = NIL; rel->partial_pathlist = NIL; - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL)); /* * We set the cheapest path immediately, to ensure that IS_DUMMY_REL() diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index ce32b8a4..5ae02c8 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -128,6 +128,7 @@ bool enable_mergejoin = true; bool enable_hashjoin = true; bool enable_gathermerge = true; bool enable_partition_wise_join = false; +bool enable_parallel_append = true; typedef struct { @@ -160,6 +161,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root, Relids inner_relids, SpecialJoinInfo *sjinfo, List **restrictlist); +static Cost append_nonpartial_cost(List *subpaths, int numpaths, + int parallel_workers); static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static double relation_byte_size(double tuples, int width); static double page_size(double tuples, int width); @@ -1742,6 +1745,189 @@ cost_sort(Path *path, PlannerInfo *root, } /* + * append_nonpartial_cost + * Determines and returns the cost of non-partial paths of Append node. + * + * It is the total cost units taken by all the workers to finish all the + * non-partial subpaths. + * subpaths contains non-partial paths followed by partial paths. + * numpaths tells the number of non-partial paths. + */ +static Cost +append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers) +{ + Cost *costarr; + int arrlen; + ListCell *l; + ListCell *cell; + int i; + int path_index; + int min_index; + int max_index; + + if (numpaths == 0) + return 0; + + /* + * Build the cost array containing costs of first n number of subpaths, + * where n = parallel_workers. Also, its size is kept only as long as the + * number of subpaths, or parallel_workers, whichever is minimum. + */ + arrlen = Min(parallel_workers, numpaths); + costarr = (Cost *) palloc(sizeof(Cost) * arrlen); + path_index = 0; + foreach(cell, subpaths) + { + Path *subpath = (Path *) lfirst(cell); + + if (path_index == arrlen) + break; + costarr[path_index++] = subpath->total_cost; + } + + /* + * Since the subpaths are non-partial paths, the array is initially sorted + * by decreasing cost. So choose the last one for the index with minimum + * cost. + */ + min_index = arrlen - 1; + + /* + * For each of the remaining subpaths, add its cost to the array element + * with minimum cost. + */ + for_each_cell(l, cell) + { + Path *subpath = (Path *) lfirst(l); + int i; + + /* Consider only the non-partial paths */ + if (path_index++ == numpaths) + break; + + costarr[min_index] += subpath->total_cost; + + /* Update the new min cost array index */ + for (min_index = i = 0; i < arrlen; i++) + { + if (costarr[i] < costarr[min_index]) + min_index = i; + } + } + + /* Return the highest cost from the array */ + for (max_index = i = 0; i < arrlen; i++) + { + if (costarr[i] > costarr[max_index]) + max_index = i; + } + + return costarr[max_index]; +} + +/* + * cost_append + * Determines and returns the cost of an Append node. + * + * We charge nothing extra for the Append itself, which perhaps is too + * optimistic, but since it doesn't do any selection or projection, it is a + * pretty cheap node. + */ +void +cost_append(Path *path, List *subpaths, int num_nonpartial_subpaths) +{ + ListCell *l; + + path->rows = 0; + path->startup_cost = 0; + path->total_cost = 0; + + if (list_length(subpaths) == 0) + return; + + if (!path->parallel_aware) + { + Path *subpath = (Path *) linitial(subpaths); + + /* + * Startup cost of non-parallel-aware Append is the startup cost of + * first subpath. + */ + path->startup_cost = subpath->startup_cost; + + /* Compute rows and costs as sums of subplan rows and costs. */ + foreach(l, subpaths) + { + Path *subpath = (Path *) lfirst(l); + + path->rows += subpath->rows; + path->total_cost += subpath->total_cost; + } + } + else /* parallel-aware */ + { + double max_rows = 0; + double nonpartial_rows = 0; + int i = 0; + + /* Include the non-partial paths total cost */ + path->total_cost += append_nonpartial_cost(subpaths, + num_nonpartial_subpaths, + path->parallel_workers); + + /* Calculate startup cost; also add up all the rows for later use */ + foreach(l, subpaths) + { + Path *subpath = (Path *) lfirst(l); + + /* + * Append would start returning tuples when the child node having + * lowest startup cost is done setting up. We consider only the + * first few subplans that immediately get a worker assigned. + */ + if (i < path->parallel_workers) + { + path->startup_cost = Min(path->startup_cost, + subpath->startup_cost); + } + + if (i < num_nonpartial_subpaths) + { + nonpartial_rows += subpath->rows; + + /* Also keep track of max rows for any given subpath */ + max_rows = Max(max_rows, subpath->rows); + } + + i++; + } + + /* + * As an approximation, non-partial rows are calculated as total rows + * divided by number of workers. But if there are highly unequal number + * of rows across the paths, this figure might not reflect correctly. + * So we make a note that it also should not be less than the maximum + * of all the path rows. + */ + nonpartial_rows /= path->parallel_workers; + path->rows += Max(nonpartial_rows, max_rows); + + /* Calculate partial paths cost. */ + if (list_length(subpaths) > num_nonpartial_subpaths) + { + /* Compute rows and costs as sums of subplan rows and costs. */ + for_each_cell(l, list_nth_cell(subpaths, num_nonpartial_subpaths)) + { + Path *subpath = (Path *) lfirst(l); + + path->rows += subpath->rows; + path->total_cost += subpath->total_cost; + } + } + } +} + +/* * cost_merge_append * Determines and returns the cost of a MergeAppend node. * diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c index 2b868c5..9f457d4 100644 --- a/src/backend/optimizer/path/joinrels.c +++ b/src/backend/optimizer/path/joinrels.c @@ -1232,7 +1232,8 @@ mark_dummy_rel(RelOptInfo *rel) rel->partial_pathlist = NIL; /* Set up the dummy path */ - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL)); /* Set or update cheapest_total_path and related fields */ set_cheapest(rel); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index c802d61..93a5296 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual Index scanrelid, char *enrname); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); -static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels); +static Append *make_append(List *appendplans, int first_partial_plan, + List *tlist, List *partitioned_rels); static RecursiveUnion *make_recursive_union(List *tlist, Plan *lefttree, Plan *righttree, @@ -1050,7 +1051,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) * parent-rel Vars it'll be asked to emit. */ - plan = make_append(subplans, tlist, best_path->partitioned_rels); + plan = make_append(subplans, best_path->first_partial_path, + tlist, best_path->partitioned_rels); copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -5285,7 +5287,7 @@ make_foreignscan(List *qptlist, } static Append * -make_append(List *appendplans, List *tlist, List *partitioned_rels) +make_append(List *appendplans, int first_partial_plan, List *tlist, List *partitioned_rels) { Append *node = makeNode(Append); Plan *plan = &node->plan; @@ -5296,6 +5298,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels) plan->righttree = NULL; node->partitioned_rels = partitioned_rels; node->appendplans = appendplans; + node->first_partial_plan = first_partial_plan; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index ecdd728..446d6f6 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -3655,8 +3655,10 @@ create_grouping_paths(PlannerInfo *root, path = (Path *) create_append_path(grouped_rel, paths, + NIL, NULL, 0, + false, NIL); path->pathtarget = target; } diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 1c84a2c..6ea7029 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -590,8 +590,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); - + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); @@ -702,7 +702,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 2d491eb..454a09d 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -51,6 +51,8 @@ typedef enum #define STD_FUZZ_FACTOR 1.01 static List *translate_sub_tlist(List *tlist, int relid); +static int append_total_cost_compare(const void *a, const void *b); +static int append_startup_cost_compare(const void *a, const void *b); static List *reparameterize_pathlist_by_child(PlannerInfo *root, List *pathlist, RelOptInfo *child_rel); @@ -1201,6 +1203,82 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, } /* + * get_append_num_workers + * Return the number of workers to request for partial append path. + */ +int +get_append_num_workers(List *partial_subpaths, + List *nonpartial_subpaths, + bool parallel_aware) +{ + ListCell *lc; + double log2w; + int num_workers; + int max_per_plan_workers; + + /* + * log2(number_of_subpaths)+1 formula seems to give an appropriate number of + * workers for Append path either having high number of children (> 100) or + * having all non-partial subpaths or subpaths with 1-2 parallel_workers. + * Whereas, if the subpaths->parallel_workers is high, this formula is not + * suitable, because it does not take into account per-subpath workers. + * For e.g., with 3 subplans having per-subplan workers such as (2, 8, 8), + * the Append workers should be at least 8, whereas the formula gives 2. In + * this case, it seems better to follow the method used for calculating + * parallel_workers of an unpartitioned table : log3(table_size). So we + * treat a partitioned table as if the data belongs to a single + * unpartitioned table, and then derive its workers. So it will be : + * logb(b^w1 + b^w2 + b^w3) where w1, w2.. are per-subplan workers and + * b is some logarithmic base such as 2 or 3. It turns out that this + * evaluates to a value just a bit greater than max(w1,w2, w3). So, we + * just use the maximum of workers formula. But this formula gives too few + * workers when all paths have single worker (meaning they are non-partial) + * For e.g. with workers : (1, 1, 1, 1, 1, 1), it is better to allocate 3 + * workers, whereas this method allocates only 1. + * So we use whichever method that gives higher number of workers. + */ + + /* Get log2(num_subpaths) */ + log2w = fls(list_length(partial_subpaths) + + list_length(nonpartial_subpaths)); + + /* Avoid further calculations if we already crossed max workers limit */ + if (max_parallel_workers_per_gather <= log2w + 1) + return max_parallel_workers_per_gather; + + + /* + * Get the parallel_workers value of the partial subpath having the highest + * parallel_workers. + */ + max_per_plan_workers = 1; + foreach(lc, partial_subpaths) + { + Path *subpath = lfirst(lc); + max_per_plan_workers = Max(max_per_plan_workers, + subpath->parallel_workers); + } + + /* + * For non-parallel-aware Append, all workers run a common subplan at a + * time, so it makes sense to just choose the per-subplan max workers as + * the Append workers. For parallel-aware Append, choose the higher of the + * results of the two formulae. + */ + num_workers = (parallel_aware ? + rint(Max(log2w, max_per_plan_workers) + 1) + : max_per_plan_workers); + + + /* In no case use more than max_parallel_workers_per_gather workers. */ + num_workers = Min(num_workers, max_parallel_workers_per_gather); + + Assert(num_workers > 0); + + return num_workers; +} + +/* * create_append_path * Creates a path corresponding to an Append plan, returning the * pathnode. @@ -1208,8 +1286,11 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, * Note that we must handle subpaths = NIL, representing a dummy access path. */ AppendPath * -create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, - int parallel_workers, List *partitioned_rels) +create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels) { AppendPath *pathnode = makeNode(AppendPath); ListCell *l; @@ -1219,44 +1300,83 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_appendrel_parampathinfo(rel, required_outer); - pathnode->path.parallel_aware = false; + pathnode->path.parallel_aware = (parallel_aware && parallel_workers > 0); pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* result is always considered unsorted */ pathnode->partitioned_rels = list_copy(partitioned_rels); - pathnode->subpaths = subpaths; - /* - * We don't bother with inventing a cost_append(), but just do it here. - * - * Compute rows and costs as sums of subplan rows and costs. We charge - * nothing extra for the Append itself, which perhaps is too optimistic, - * but since it doesn't do any selection or projection, it is a pretty - * cheap node. + /* For parallel append, non-partial paths are sorted by descending total + * costs. That way, the total time to finish all non-partial paths is + * minimized. Also, the partial paths are sorted by descending startup + * costs. There may be some paths that require to do startup work by a + * single worker. In such case, it's better for workers to choose the + * expensive ones first, whereas the leader should choose the cheapest + * startup plan. */ - pathnode->path.rows = 0; - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; + if (pathnode->path.parallel_aware) + { + subpaths = list_qsort(subpaths, append_total_cost_compare); + partial_subpaths = list_qsort(partial_subpaths, + append_startup_cost_compare); + } + pathnode->first_partial_path = list_length(subpaths); + pathnode->subpaths = list_concat(subpaths, partial_subpaths); + foreach(l, subpaths) { Path *subpath = (Path *) lfirst(l); - pathnode->path.rows += subpath->rows; - - if (l == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + subpath->parallel_safe; /* All child paths must have same parameterization */ Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); } + cost_append(&pathnode->path, pathnode->subpaths, + pathnode->first_partial_path); + return pathnode; } /* + * append_total_cost_compare + * list_qsort comparator for sorting append child paths by total_cost + */ +static int +append_total_cost_compare(const void *a, const void *b) +{ + Path *path1 = (Path *) lfirst(*(ListCell **) a); + Path *path2 = (Path *) lfirst(*(ListCell **) b); + + if (path1->total_cost > path2->total_cost) + return -1; + if (path1->total_cost < path2->total_cost) + return 1; + + return 0; +} + +/* + * append_startup_cost_compare + * list_qsort comparator for sorting append child paths by startup_cost + */ +static int +append_startup_cost_compare(const void *a, const void *b) +{ + Path *path1 = (Path *) lfirst(*(ListCell **) a); + Path *path2 = (Path *) lfirst(*(ListCell **) b); + + if (path1->startup_cost > path2->startup_cost) + return -1; + if (path1->startup_cost < path2->startup_cost) + return 1; + + return 0; +} + +/* * create_merge_append_path * Creates a path corresponding to a MergeAppend plan, returning the * pathnode. diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index f1060f9..f2b4474 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -517,6 +517,7 @@ RegisterLWLockTranches(void) LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE, "session_typmod_table"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index ae22185..5251259 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -920,6 +920,15 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"enable_parallel_append", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of parallel append plans."), + NULL + }, + &enable_parallel_append, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 368b280..87c54f0 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -296,6 +296,7 @@ #enable_material = on #enable_mergejoin = on #enable_nestloop = on +#enable_parallel_append = on #enable_seqscan = on #enable_sort = on #enable_tidscan = on diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index 4e38a13..78e5943 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -14,10 +14,15 @@ #ifndef NODEAPPEND_H #define NODEAPPEND_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags); extern void ExecEndAppend(AppendState *node); extern void ExecReScanAppend(AppendState *node); +extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeWorker(AppendState *node, shm_toc *toc); #endif /* NODEAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 52d3532..d23ff47 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -21,6 +21,7 @@ #include "lib/pairingheap.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/spin.h" #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" @@ -998,12 +999,16 @@ typedef struct ModifyTableState * whichplan which plan is being executed (0 .. n-1) * ---------------- */ +struct ParallelAppendDescData; typedef struct AppendState { PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; + bool is_parallel_worker; + struct ParallelAppendDescData *as_padesc; /* parallel coordination info */ + Size pappend_len; /* size of parallel coordination info */ } AppendState; /* ---------------- diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index 667d5e2..711db92 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -269,6 +269,9 @@ extern void list_free_deep(List *list); extern List *list_copy(const List *list); extern List *list_copy_tail(const List *list, int nskip); +typedef int (*list_qsort_comparator) (const void *a, const void *b); +extern List *list_qsort(const List *list, list_qsort_comparator cmp); + /* * To ease migration to the new list API, a set of compatibility * macros are provided that reduce the impact of the list API changes diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index dd74efa..08d486f 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -248,6 +248,7 @@ typedef struct Append /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *appendplans; + int first_partial_plan; } Append; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index e085cef..ec5da88 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1255,6 +1255,9 @@ typedef struct CustomPath * AppendPath represents an Append plan, ie, successive execution of * several member plans. * + * For partial Append, 'subpaths' contains non-partial subpaths followed by + * partial subpaths. + * * Note: it is possible for "subpaths" to contain only one, or even no, * elements. These cases are optimized during create_append_plan. * In particular, an AppendPath with no subpaths is a "dummy" path that @@ -1266,6 +1269,9 @@ typedef struct AppendPath /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *subpaths; /* list of component Paths */ + + /* Index of first partial path in subpaths */ + int first_partial_path; } AppendPath; #define IS_DUMMY_PATH(p) \ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 306d923..7d3a547 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -68,6 +68,7 @@ extern bool enable_mergejoin; extern bool enable_hashjoin; extern bool enable_gathermerge; extern bool enable_partition_wise_join; +extern bool enable_parallel_append; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -106,6 +107,8 @@ extern void cost_sort(Path *path, PlannerInfo *root, List *pathkeys, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples); +extern void cost_append(Path *path, List *subpaths, + int num_nonpartial_subpaths); extern void cost_merge_append(Path *path, PlannerInfo *root, List *pathkeys, int n_streams, Cost input_startup_cost, Cost input_total_cost, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index e9ed16a..51b5096 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -14,6 +14,7 @@ #ifndef PATHNODE_H #define PATHNODE_H +#include "nodes/bitmapset.h" #include "nodes/relation.h" @@ -63,9 +64,14 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root, List *bitmapquals); extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, Relids required_outer); -extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths, - Relids required_outer, int parallel_workers, - List *partitioned_rels); +extern int get_append_num_workers(List *partial_subpaths, + List *nonpartial_subpaths, + bool parallel_aware); +extern AppendPath *create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels); extern MergeAppendPath *create_merge_append_path(PlannerInfo *root, RelOptInfo *rel, List *subpaths, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index f4c4aed..e190155 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -216,6 +216,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_SESSION_RECORD_TABLE, LWTRANCHE_SESSION_TYPMOD_TABLE, LWTRANCHE_TBM, + LWTRANCHE_PARALLEL_APPEND, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out index c698faf..9692155 100644 --- a/src/test/regress/expected/inherit.out +++ b/src/test/regress/expected/inherit.out @@ -1404,6 +1404,7 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallel_append = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; QUERY PLAN ------------------------------------------------------------------ @@ -1470,6 +1471,7 @@ select min(1-id) from matest0; (1 row) reset enable_seqscan; +reset enable_parallel_append; drop table matest0 cascade; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table matest1 diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 2ae600f..b4cf7cb 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -11,13 +11,38 @@ set parallel_setup_cost=0; set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; +-- test Parallel Append. explain (costs off) - select count(*) from a_star; + select round(avg(aa)), sum(aa) from a_star; QUERY PLAN ----------------------------------------------------- Finalize Aggregate -> Gather - Workers Planned: 1 + Workers Planned: 4 + -> Partial Aggregate + -> Parallel Append + -> Parallel Seq Scan on a_star + -> Parallel Seq Scan on b_star + -> Parallel Seq Scan on c_star + -> Parallel Seq Scan on d_star + -> Parallel Seq Scan on e_star + -> Parallel Seq Scan on f_star +(11 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +----------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 4 -> Partial Aggregate -> Append -> Parallel Seq Scan on a_star @@ -28,12 +53,63 @@ explain (costs off) -> Parallel Seq Scan on f_star (11 rows) -select count(*) from a_star; - count -------- - 50 +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +set enable_parallel_append to on; +-- Mix of partial and non-partial subplans. +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +----------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 4 + -> Partial Aggregate + -> Parallel Append + -> Seq Scan on d_star + -> Seq Scan on c_star + -> Parallel Seq Scan on a_star + -> Parallel Seq Scan on b_star + -> Parallel Seq Scan on e_star + -> Parallel Seq Scan on f_star +(11 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +-------------------------------- + Aggregate + -> Append + -> Seq Scan on a_star + -> Seq Scan on b_star + -> Seq Scan on c_star + -> Seq Scan on d_star + -> Seq Scan on e_star + -> Seq Scan on f_star +(8 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 (1 row) +reset enable_parallel_append; +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); -- test that parallel_restricted function doesn't run in worker alter table tenk1 set (parallel_workers = 4); explain (verbose, costs off) diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index cd1f7f3..2b738aa 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -81,11 +81,12 @@ select name, setting from pg_settings where name like 'enable%'; enable_material | on enable_mergejoin | on enable_nestloop | on + enable_parallel_append | on enable_partition_wise_join | off enable_seqscan | on enable_sort | on enable_tidscan | on -(13 rows) +(14 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql index 169d0dc..3fafc5f 100644 --- a/src/test/regress/sql/inherit.sql +++ b/src/test/regress/sql/inherit.sql @@ -508,11 +508,13 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallel_append = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; select * from matest0 order by 1-id; explain (verbose, costs off) select min(1-id) from matest0; select min(1-id) from matest0; reset enable_seqscan; +reset enable_parallel_append; drop table matest0 cascade; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 89fe80a..8de082b 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -15,9 +15,28 @@ set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; -explain (costs off) - select count(*) from a_star; -select count(*) from a_star; +-- test Parallel Append. +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +set enable_parallel_append to on; +-- Mix of partial and non-partial subplans. +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +reset enable_parallel_append; +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); -- test that parallel_restricted function doesn't run in worker alter table tenk1 set (parallel_workers = 4);