diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 2b6255e..2ad1397 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3679,6 +3679,20 @@ ANY num_sync ( + enable_parallelappend (boolean) + + enable_parallelappend configuration parameter + + + + + Enables or disables the query planner's use of parallel-aware + append plan types. The default is on. + + + + enable_seqscan (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5575c2c..b2fa245 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -845,7 +845,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser - LWLock + LWLock ShmemIndexLock Waiting to find or allocate space in shared memory. @@ -1109,6 +1109,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser Waiting for TBM shared iterator lock. + parallel_append + Waiting to choose the next subplan during Parallel Append plan + execution. + + Lock relation Waiting to acquire a lock on a relation. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index c713b85..df0acdb 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -25,6 +25,7 @@ #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" @@ -244,6 +245,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecForeignScanEstimate((ForeignScanState *) planstate, e->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendEstimate((AppendState *) planstate, + e->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanEstimate((CustomScanState *) planstate, @@ -316,6 +322,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecForeignScanInitializeDSM((ForeignScanState *) planstate, d->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeDSM((AppendState *) planstate, + d->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeDSM((CustomScanState *) planstate, @@ -908,6 +919,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) ExecForeignScanInitializeWorker((ForeignScanState *) planstate, toc); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeWorker((AppendState *) planstate, toc); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeWorker((CustomScanState *) planstate, diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index bed9bb8..11f9688 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -60,10 +60,46 @@ #include "executor/execdebug.h" #include "executor/nodeAppend.h" #include "miscadmin.h" +#include "optimizer/cost.h" +#include "storage/spin.h" -static TupleTableSlot *ExecAppend(PlanState *pstate); -static bool exec_append_initialize_next(AppendState *appendstate); +/* + * Shared state for Parallel Append. + * + * Each backend participating in a Parallel Append has its own + * descriptor in backend-private memory, and those objects all contain + * a pointer to this structure. + */ +typedef struct ParallelAppendDescData +{ + LWLock pa_lock; /* mutual exclusion to choose next subplan */ + int pa_first_plan; /* plan to choose while wrapping around plans */ + int pa_next_plan; /* next plan to choose by any worker */ + + /* + * pa_finished : workers currently executing the subplan. A worker which + * finishes a subplan should set pa_finished to true, so that no new + * worker picks this subplan. For non-partial subplan, a worker which picks + * up that subplan should immediately set to true, so as to make sure + * there are no more than 1 worker assigned to this subplan. + */ + bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; +} ParallelAppendDescData; + +typedef ParallelAppendDescData *ParallelAppendDesc; + +/* + * Special value of AppendState->as_whichplan for Parallel Append, which + * indicates there are no plans left to be executed. + */ +#define PA_INVALID_PLAN -1 +static TupleTableSlot *ExecAppend(PlanState *pstate); +static bool exec_append_seq_next(AppendState *appendstate); +static bool exec_append_parallel_next(AppendState *state); +static bool exec_append_leader_next(AppendState *state); +static int exec_append_get_next_plan(int curplan, int first_plan, + int last_plan); /* ---------------------------------------------------------------- * exec_append_initialize_next @@ -74,11 +110,20 @@ static bool exec_append_initialize_next(AppendState *appendstate); * ---------------------------------------------------------------- */ static bool -exec_append_initialize_next(AppendState *appendstate) +exec_append_seq_next(AppendState *appendstate) { int whichplan; /* + * Not parallel-aware. Fine, just go on to the next subplan in the + * appropriate direction. + */ + if (ScanDirectionIsForward(appendstate->ps.state->es_direction)) + appendstate->as_whichplan++; + else + appendstate->as_whichplan--; + + /* * get information from the append node */ whichplan = appendstate->as_whichplan; @@ -185,10 +230,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->ps.ps_ProjInfo = NULL; /* - * initialize to scan first subplan + * Initialize to scan first subplan (but note that we'll override this + * later in the case of a parallel append). */ appendstate->as_whichplan = 0; - exec_append_initialize_next(appendstate); return appendstate; } @@ -204,6 +249,14 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + /* + * Check if we are already finished plans from parallel append. This + * can happen if all the subplans are finished when this worker + * has not even started returning tuples. + */ + if (node->as_padesc && node->as_whichplan == PA_INVALID_PLAN) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + for (;;) { PlanState *subnode; @@ -232,16 +285,31 @@ ExecAppend(PlanState *pstate) } /* - * Go on to the "next" subplan in the appropriate direction. If no - * more subplans, return the empty slot set up for us by - * ExecInitAppend. + * Go on to the "next" subplan. If no more subplans, return the empty + * slot set up for us by ExecInitAppend. */ - if (ScanDirectionIsForward(node->ps.state->es_direction)) - node->as_whichplan++; + if (!node->as_padesc) + { + /* + * This is Parallel-aware append. Follow it's own logic of choosing + * the next subplan. + */ + if (!exec_append_seq_next(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } else - node->as_whichplan--; - if (!exec_append_initialize_next(node)) - return ExecClearTuple(node->ps.ps_ResultTupleSlot); + { + /* + * We are done with this subplan. There might be other workers + * still processing the last chunk of rows for this same subplan, + * but there's no point for new workers to run this subplan, so + * mark this subplan as finished. + */ + node->as_padesc->pa_finished[node->as_whichplan] = true; + + if (!exec_append_parallel_next(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + } /* Else loop back and try to get a tuple from the new subplan */ } @@ -279,6 +347,7 @@ void ExecReScanAppend(AppendState *node) { int i; + ParallelAppendDesc padesc = node->as_padesc; for (i = 0; i < node->as_nplans; i++) { @@ -298,6 +367,276 @@ ExecReScanAppend(AppendState *node) if (subnode->chgParam == NULL) ExecReScan(subnode); } + + if (padesc) + { + padesc->pa_first_plan = padesc->pa_next_plan = 0; + memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans); + } + node->as_whichplan = 0; - exec_append_initialize_next(node); +} + +/* ---------------------------------------------------------------- + * Parallel Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendEstimate + * + * estimates the space required to serialize Append node. + * ---------------------------------------------------------------- + */ +void +ExecAppendEstimate(AppendState *node, + ParallelContext *pcxt) +{ + node->pappend_len = + add_size(offsetof(struct ParallelAppendDescData, pa_finished), + sizeof(bool) * node->as_nplans); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pappend_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + + +/* ---------------------------------------------------------------- + * ExecAppendInitializeDSM + * + * Set up a Parallel Append descriptor. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeDSM(AppendState *node, + ParallelContext *pcxt) +{ + ParallelAppendDesc padesc; + + padesc = shm_toc_allocate(pcxt->toc, node->pappend_len); + + /* + * Just setting all the fields to 0 is enough. The logic of choosing the + * next plan in workers will take care of everything else. + */ + memset(padesc, 0, sizeof(ParallelAppendDescData)); + memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans); + + LWLockInitialize(&padesc->pa_lock, LWTRANCHE_PARALLEL_APPEND); + + node->as_padesc = padesc; + + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, padesc); + + /* Choose the first subplan to be executed. */ + (void) exec_append_parallel_next(node); +} + +/* ---------------------------------------------------------------- + * ExecAppendInitializeWorker + * + * Copy relevant information from TOC into planstate, and initialize + * whatever is required to choose and execute the optimal subplan. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeWorker(AppendState *node, shm_toc *toc) +{ + node->as_padesc = shm_toc_lookup(toc, node->ps.plan->plan_node_id, false); + + /* Choose the first subplan to be executed. */ + (void) exec_append_parallel_next(node); +} + +/* ---------------------------------------------------------------- + * exec_append_parallel_next + * + * Determine the next subplan that should be executed. Each worker uses a + * shared next_subplan counter index to start looking for unfinished plan, + * executes the subplan, then shifts ahead this counter to the next + * subplan, so that other workers know which next plan to choose. This + * way, workers choose the subplans in round robin order, and thus they + * get evenly distributed among the subplans. + * + * Returns false if and only if all subplans are already finished + * processing. + * ---------------------------------------------------------------- + */ +static bool +exec_append_parallel_next(AppendState *state) +{ + ParallelAppendDesc padesc = state->as_padesc; + int whichplan; + int initial_plan; + int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan; + bool found; + + Assert(padesc != NULL); + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(state->ps.state->es_direction)); + + /* The parallel leader chooses its next subplan differently */ + if (!IsParallelWorker()) + return exec_append_leader_next(state); + + LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE); + + /* Make a note of which subplan we have started with */ + initial_plan = padesc->pa_next_plan; + + /* + * Keep going to the next plan until we find an unfinished one. In the + * process, also keep track of the first unfinished non-partial subplan. As + * the non-partial subplans are taken one by one, the first unfinished + * subplan index will shift ahead, so that we don't have to visit the + * finished non-partial ones anymore. + */ + + found = false; + for (whichplan = initial_plan; whichplan != PA_INVALID_PLAN;) + { + /* + * Ignore plans that are already done processing. These also include + * non-partial subplans which have already been taken by a worker. + */ + if (!padesc->pa_finished[whichplan]) + { + found = true; + break; + } + + /* + * Note: There is a chance that just after the child plan node is + * chosen above, some other worker finishes this node and sets + * pa_finished to true. In that case, this worker will go ahead and + * call ExecProcNode(child_node), which will return NULL tuple since it + * is already finished, and then once again this worker will try to + * choose next subplan; but this is ok : it's just an extra + * "choose_next_subplan" operation. + */ + + /* Either go to the next plan, or wrap around to the first one */ + whichplan = exec_append_get_next_plan(whichplan, padesc->pa_first_plan, + state->as_nplans - 1); + + /* + * If we have wrapped around and returned to the same index again, we + * are done scanning. + */ + if (whichplan == initial_plan) + break; + } + + if (!found) + { + /* + * We didn't find any plan to execute, stop executing, and indicate + * the same for other workers to know that there is no next plan. + */ + padesc->pa_next_plan = state->as_whichplan = PA_INVALID_PLAN; + } + else + { + /* + * If this a non-partial plan, immediately mark it finished, and shift + * ahead pa_first_plan. + */ + if (whichplan < first_partial_plan) + { + padesc->pa_finished[whichplan] = true; + padesc->pa_first_plan = whichplan + 1; + } + + /* + * Set the chosen plan, and the next plan to be picked by other + * workers. + */ + state->as_whichplan = whichplan; + padesc->pa_next_plan = exec_append_get_next_plan(whichplan, + padesc->pa_first_plan, + state->as_nplans - 1); + } + + LWLockRelease(&padesc->pa_lock); + + return found; +} + +/* ---------------------------------------------------------------- + * exec_append_leader_next + * + * To be used only if it's a parallel leader. The backend should scan + * backwards from the last plan. This is to prevent it from taking up + * the most expensive non-partial plan, i.e. the first subplan. + * ---------------------------------------------------------------- + */ +static bool +exec_append_leader_next(AppendState *state) +{ + ParallelAppendDesc padesc = state->as_padesc; + int first_plan; + int whichplan; + int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan; + + LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE); + + /* The parallel leader should start from the last subplan. */ + first_plan = padesc->pa_first_plan; + + for (whichplan = state->as_nplans - 1; whichplan >= first_plan; + whichplan--) + { + if (!padesc->pa_finished[whichplan]) + { + /* If this a non-partial plan, immediately mark it finished */ + if (whichplan < first_partial_plan) + padesc->pa_finished[whichplan] = true; + + break; + } + } + + LWLockRelease(&padesc->pa_lock); + + /* Return false only if we didn't find any plan to execute */ + if (whichplan < first_plan) + { + state->as_whichplan = PA_INVALID_PLAN; + return false; + } + else + { + state->as_whichplan = whichplan; + return true; + } +} + +/* ---------------------------------------------------------------- + * exec_append_get_next_plan + * + * Either go to the next index, or wrap around to the first unfinished one. + * Returns this next index. While wrapping around, if the first unfinished + * one itself is past the last plan, returns PA_INVALID_PLAN. + * ---------------------------------------------------------------- + */ +static int +exec_append_get_next_plan(int curplan, int first_plan, int last_plan) +{ + Assert(curplan <= last_plan); + + if (curplan < last_plan) + return curplan + 1; + else + { + /* + * We are already at the last plan. If the first_plan itsef is the last + * plan or if it is past the last plan, that means there is no next + * plan remaining. Return Invalid. + */ + if (first_plan >= last_plan) + return PA_INVALID_PLAN; + + return first_plan; + } } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index f9ddf4e..1b8a7d1 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -242,6 +242,7 @@ _copyAppend(const Append *from) */ COPY_NODE_FIELD(partitioned_rels); COPY_NODE_FIELD(appendplans); + COPY_SCALAR_FIELD(first_partial_plan); return newnode; } diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index acaf4b5..75761a9 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -1250,6 +1250,45 @@ list_copy_tail(const List *oldlist, int nskip) } /* + * Sort a list using qsort. A sorted list is built but the cells of the original + * list are re-used. Caller has to pass a copy of the list if the original list + * needs to be untouched. Effectively, the comparator function is passed + * pointers to ListCell* pointers. + */ +List * +list_qsort(const List *list, list_qsort_comparator cmp) +{ + ListCell *cell; + int i; + int len = list_length(list); + ListCell **list_arr; + List *new_list; + + if (len == 0) + return NIL; + + i = 0; + list_arr = palloc(sizeof(ListCell *) * len); + foreach(cell, list) + list_arr[i++] = cell; + + qsort(list_arr, len, sizeof(ListCell *), cmp); + + new_list = (List *) palloc(sizeof(List)); + new_list->type = T_List; + new_list->length = len; + new_list->head = list_arr[0]; + new_list->tail = list_arr[len-1]; + + for (i = 0; i < len-1; i++) + list_arr[i]->next = list_arr[i+1]; + + list_arr[len-1]->next = NULL; + pfree(list_arr); + return new_list; +} + +/* * Temporary compatibility functions * * In order to avoid warnings for these function definitions, we need diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index 9ee3e23..b1d8bc7 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -394,6 +394,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(partitioned_rels); WRITE_NODE_FIELD(appendplans); + WRITE_INT_FIELD(first_partial_plan); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 67b9e19..a25213f 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1594,6 +1594,7 @@ _readAppend(void) READ_NODE_FIELD(partitioned_rels); READ_NODE_FIELD(appendplans); + READ_INT_FIELD(first_partial_plan); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 2d7e1d8..5f58fe1 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -101,6 +101,9 @@ static Path *get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); static List *accumulate_append_subpath(List *subpaths, Path *path); +static List *accumulate_partialappend_subpath(List *partial_subpaths, + Path *subpath, bool is_partial, + List **nonpartial_subpaths); static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel, @@ -1281,7 +1284,11 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, List *subpaths = NIL; bool subpaths_valid = true; List *partial_subpaths = NIL; + List *pa_partial_subpaths = NIL; + List *pa_nonpartial_subpaths = NIL; bool partial_subpaths_valid = true; + bool pa_subpaths_valid = enable_parallelappend; + bool pa_all_partial_subpaths = enable_parallelappend; List *all_child_pathkeys = NIL; List *all_child_outers = NIL; ListCell *l; @@ -1317,7 +1324,65 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, else subpaths_valid = false; - /* Same idea, but for a partial plan. */ + /* Same idea, but for a parallel append path. */ + if (pa_subpaths_valid && enable_parallelappend) + { + Path *chosen_path = NULL; + Path *cheapest_partial_path = NULL; + Path *cheapest_parallel_safe_path = NULL; + + /* + * Extract the cheapest unparameterized, parallel-safe one among + * the child paths. + */ + cheapest_parallel_safe_path = + get_cheapest_parallel_safe_total_inner(childrel->pathlist); + + /* Get the cheapest partial path */ + if (childrel->partial_pathlist != NIL) + cheapest_partial_path = linitial(childrel->partial_pathlist); + + if (!cheapest_parallel_safe_path && !cheapest_partial_path) + { + /* + * This child rel neither has a partial path, nor has a + * parallel-safe path. Drop the idea for parallel append. + */ + pa_subpaths_valid = false; + } + else if (cheapest_partial_path && cheapest_parallel_safe_path) + { + /* Both are valid. Choose the cheaper out of the two */ + if (cheapest_parallel_safe_path->total_cost < + cheapest_partial_path->total_cost) + chosen_path = cheapest_parallel_safe_path; + else + chosen_path = cheapest_partial_path; + } + else + { + /* Either one is valid. Choose the valid one */ + chosen_path = cheapest_partial_path ? + cheapest_partial_path : + cheapest_parallel_safe_path; + } + + /* If we got a valid path, add it */ + if (chosen_path) + { + pa_partial_subpaths = + accumulate_partialappend_subpath( + pa_partial_subpaths, + chosen_path, + chosen_path == cheapest_partial_path, + &pa_nonpartial_subpaths); + } + + if (chosen_path && chosen_path != cheapest_partial_path) + pa_all_partial_subpaths = false; + } + + /* Same idea, but for a non-parallel partial plan. */ if (childrel->partial_pathlist != NIL) partial_subpaths = accumulate_append_subpath(partial_subpaths, linitial(childrel->partial_pathlist)); @@ -1395,23 +1460,39 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, * if we have zero or one live subpath due to constraint exclusion.) */ if (subpaths_valid) - add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0, + add_path(rel, (Path *) create_append_path(rel, subpaths, NIL, + NULL, 0, false, partitioned_rels)); + /* Consider parallel append path. */ + if (pa_subpaths_valid) + { + AppendPath *appendpath; + int parallel_workers; + + parallel_workers = get_append_num_workers(pa_partial_subpaths, + pa_nonpartial_subpaths); + appendpath = create_append_path(rel, pa_nonpartial_subpaths, + pa_partial_subpaths, + NULL, parallel_workers, true, + partitioned_rels); + add_partial_path(rel, (Path *) appendpath); + } + /* - * Consider an append of partial unordered, unparameterized partial paths. + * Consider non-parallel partial append path. But if the parallel append + * path is made out of all partial subpaths, don't create another partial + * path; we will keep only the parallel append path in that case. */ - if (partial_subpaths_valid) + if (partial_subpaths_valid && !pa_all_partial_subpaths) { AppendPath *appendpath; ListCell *lc; int parallel_workers = 0; /* - * Decide on the number of workers to request for this append path. - * For now, we just use the maximum value from among the members. It - * might be useful to use a higher number if the Append node were - * smart enough to spread out the workers, but it currently isn't. + * To decide the number of workers, just use the maximum value from + * among the children. */ foreach(lc, partial_subpaths) { @@ -1421,9 +1502,9 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, } Assert(parallel_workers > 0); - /* Generate a partial append path. */ - appendpath = create_append_path(rel, partial_subpaths, NULL, - parallel_workers, partitioned_rels); + appendpath = create_append_path(rel, NIL, partial_subpaths, + NULL, parallel_workers, false, + partitioned_rels); add_partial_path(rel, (Path *) appendpath); } @@ -1476,7 +1557,8 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, if (subpaths_valid) add_path(rel, (Path *) - create_append_path(rel, subpaths, required_outer, 0, + create_append_path(rel, subpaths, NIL, + required_outer, 0, false, partitioned_rels)); } } @@ -1694,6 +1776,78 @@ accumulate_append_subpath(List *subpaths, Path *path) } /* + * accumulate_partialappend_subpath: + * Add a subpath to the list being built for a partial Append. + * + * This is same as accumulate_append_subpath, except that two separate lists + * are created, one containing only partial subpaths, and the other containing + * only non-partial subpaths. Also, the non-partial paths are kept ordered + * by descending total cost. + * + * is_partial is true if the subpath being added is a partial subpath. + */ +static List * +accumulate_partialappend_subpath(List *partial_subpaths, + Path *subpath, bool is_partial, + List **nonpartial_subpaths) +{ + /* list_copy is important here to avoid sharing list substructure */ + + if (IsA(subpath, AppendPath)) + { + AppendPath *apath = (AppendPath *) subpath; + List *apath_partial_paths; + List *apath_nonpartial_paths; + + /* Split the Append subpaths into partial and non-partial paths */ + apath_nonpartial_paths = list_truncate(list_copy(apath->subpaths), + apath->first_partial_path); + apath_partial_paths = list_copy_tail(apath->subpaths, + apath->first_partial_path); + + /* Add non-partial subpaths, if any. */ + *nonpartial_subpaths = list_concat(*nonpartial_subpaths, + list_copy(apath_nonpartial_paths)); + + /* Add partial subpaths, if any. */ + return list_concat(partial_subpaths, apath_partial_paths); + } + else if (IsA(subpath, MergeAppendPath)) + { + MergeAppendPath *mpath = (MergeAppendPath *) subpath; + + /* + * If at all MergeAppend is partial, all its child plans have to be + * partial : we don't currently support a mix of partial and + * non-partial MergeAppend subpaths. + */ + if (is_partial) + return list_concat(partial_subpaths, list_copy(mpath->subpaths)); + else + { + /* + * Since MergePath itself is non-partial, treat all its subpaths + * non-partial. + */ + *nonpartial_subpaths = list_concat(*nonpartial_subpaths, + list_copy(mpath->subpaths)); + return partial_subpaths; + } + } + else + { + /* Just add it to the right list depending upon whether it's partial */ + if (is_partial) + return lappend(partial_subpaths, subpath); + else + { + *nonpartial_subpaths = lappend(*nonpartial_subpaths, subpath); + return partial_subpaths; + } + } +} + +/* * set_dummy_rel_pathlist * Build a dummy path for a relation that's been excluded by constraints * @@ -1713,7 +1867,8 @@ set_dummy_rel_pathlist(RelOptInfo *rel) rel->pathlist = NIL; rel->partial_pathlist = NIL; - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL)); /* * We set the cheapest path immediately, to ensure that IS_DUMMY_REL() diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 051a854..79c08aa 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -127,6 +127,7 @@ bool enable_material = true; bool enable_mergejoin = true; bool enable_hashjoin = true; bool enable_gathermerge = true; +bool enable_parallelappend = true; typedef struct { @@ -159,6 +160,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root, Relids inner_relids, SpecialJoinInfo *sjinfo, List **restrictlist); +static Cost append_nonpartial_cost(List *subpaths, int numpaths, + int parallel_workers); static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static double relation_byte_size(double tuples, int width); static double page_size(double tuples, int width); @@ -1741,6 +1744,189 @@ cost_sort(Path *path, PlannerInfo *root, } /* + * append_nonpartial_cost + * Determines and returns the cost of non-partial paths of Append node. + * + * It is the total cost units taken by all the workers to finish all the + * non-partial subpaths. + * subpaths contains non-partial paths followed by partial paths. + * numpaths tells the number of non-partial paths. + */ +static Cost +append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers) +{ + Cost *costarr; + int arrlen; + ListCell *l; + ListCell *cell; + int i; + int path_index; + int min_index; + int max_index; + + if (numpaths == 0) + return 0; + + /* + * Build the cost array containing costs of first n number of subpaths, + * where n = parallel_workers. Also, its size is kept only as long as the + * number of subpaths, or parallel_workers, whichever is minimum. + */ + arrlen = Min(parallel_workers, numpaths); + costarr = (Cost *) palloc(sizeof(Cost) * arrlen); + path_index = 0; + foreach(cell, subpaths) + { + Path *subpath = (Path *) lfirst(cell); + + if (path_index == arrlen) + break; + costarr[path_index++] = subpath->total_cost; + } + + /* + * Since the subpaths are non-partial paths, the array is initially sorted + * by decreasing cost. So choose the last one for the index with minimum + * cost. + */ + min_index = arrlen - 1; + + /* + * For each of the remaining subpaths, add its cost to the array element + * with minimum cost. + */ + for_each_cell(l, cell) + { + Path *subpath = (Path *) lfirst(l); + int i; + + /* Consider only the non-partial paths */ + if (path_index++ == numpaths) + break; + + costarr[min_index] += subpath->total_cost; + + /* Update the new min cost array index */ + for (min_index = i = 0; i < arrlen; i++) + { + if (costarr[i] < costarr[min_index]) + min_index = i; + } + } + + /* Return the highest cost from the array */ + for (max_index = i = 0; i < arrlen; i++) + { + if (costarr[i] > costarr[max_index]) + max_index = i; + } + + return costarr[max_index]; +} + +/* + * cost_append + * Determines and returns the cost of an Append node. + * + * We charge nothing extra for the Append itself, which perhaps is too + * optimistic, but since it doesn't do any selection or projection, it is a + * pretty cheap node. + */ +void +cost_append(Path *path, List *subpaths, int num_nonpartial_subpaths) +{ + ListCell *l; + + path->rows = 0; + path->startup_cost = 0; + path->total_cost = 0; + + if (list_length(subpaths) == 0) + return; + + if (!path->parallel_aware) + { + Path *subpath = (Path *) linitial(subpaths); + + /* + * Startup cost of non-parallel-aware Append is the startup cost of + * first subpath. + */ + path->startup_cost = subpath->startup_cost; + + /* Compute rows and costs as sums of subplan rows and costs. */ + foreach(l, subpaths) + { + Path *subpath = (Path *) lfirst(l); + + path->rows += subpath->rows; + path->total_cost += subpath->total_cost; + } + } + else /* parallel-aware */ + { + double max_rows = 0; + double nonpartial_rows = 0; + int i = 0; + + /* Include the non-partial paths total cost */ + path->total_cost += append_nonpartial_cost(subpaths, + num_nonpartial_subpaths, + path->parallel_workers); + + /* Calculate startup cost; also add up all the rows for later use */ + foreach(l, subpaths) + { + Path *subpath = (Path *) lfirst(l); + + /* + * Append would start returning tuples when the child node having + * lowest startup cost is done setting up. We consider only the + * first few subplans that immediately get a worker assigned. + */ + if (i < path->parallel_workers) + { + path->startup_cost = Min(path->startup_cost, + subpath->startup_cost); + } + + if (i < num_nonpartial_subpaths) + { + nonpartial_rows += subpath->rows; + + /* Also keep track of max rows for any given subpath */ + max_rows = Max(max_rows, subpath->rows); + } + + i++; + } + + /* + * As an approximation, non-partial rows are calculated as total rows + * divided by number of workers. But if there are highly unequal number + * of rows across the paths, this figure might not reflect correctly. + * So we make a note that it also should not be less than the maximum + * of all the path rows. + */ + nonpartial_rows /= path->parallel_workers; + path->rows += Max(nonpartial_rows, max_rows); + + /* Calculate partial paths cost. */ + if (list_length(subpaths) > num_nonpartial_subpaths) + { + /* Compute rows and costs as sums of subplan rows and costs. */ + for_each_cell(l, list_nth_cell(subpaths, num_nonpartial_subpaths)) + { + Path *subpath = (Path *) lfirst(l); + + path->rows += subpath->rows; + path->total_cost += subpath->total_cost; + } + } + } +} + +/* * cost_merge_append * Determines and returns the cost of a MergeAppend node. * diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c index 6ee2350..0eee647 100644 --- a/src/backend/optimizer/path/joinrels.c +++ b/src/backend/optimizer/path/joinrels.c @@ -1217,7 +1217,8 @@ mark_dummy_rel(RelOptInfo *rel) rel->partial_pathlist = NIL; /* Set up the dummy path */ - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL)); /* Set or update cheapest_total_path and related fields */ set_cheapest(rel); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 2821662..055ac64 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual Index scanrelid, char *enrname); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); -static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels); +static Append *make_append(List *appendplans, int first_partial_plan, + List *tlist, List *partitioned_rels); static RecursiveUnion *make_recursive_union(List *tlist, Plan *lefttree, Plan *righttree, @@ -1049,7 +1050,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) * parent-rel Vars it'll be asked to emit. */ - plan = make_append(subplans, tlist, best_path->partitioned_rels); + plan = make_append(subplans, best_path->first_partial_path, + tlist, best_path->partitioned_rels); copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -5274,7 +5276,7 @@ make_foreignscan(List *qptlist, } static Append * -make_append(List *appendplans, List *tlist, List *partitioned_rels) +make_append(List *appendplans, int first_partial_plan, List *tlist, List *partitioned_rels) { Append *node = makeNode(Append); Plan *plan = &node->plan; @@ -5285,6 +5287,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels) plan->righttree = NULL; node->partitioned_rels = partitioned_rels; node->appendplans = appendplans; + node->first_partial_plan = first_partial_plan; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 9662302..d9fab6b 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -3603,8 +3603,10 @@ create_grouping_paths(PlannerInfo *root, path = (Path *) create_append_path(grouped_rel, paths, + NIL, NULL, 0, + false, NIL); path->pathtarget = target; } diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index e73c819..4af5740 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -578,8 +578,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); - + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); @@ -690,7 +690,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 26567cb..846f33e 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -46,6 +46,7 @@ typedef enum #define STD_FUZZ_FACTOR 1.01 static List *translate_sub_tlist(List *tlist, int relid); +static int append_total_cost_compare(const void *a, const void *b); /***************************************************************************** @@ -1193,6 +1194,70 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, } /* + * get_append_num_workers + * Return the number of workers to request for partial append path. + */ +int +get_append_num_workers(List *partial_subpaths, List *nonpartial_subpaths) +{ + ListCell *lc; + double log2w; + int num_workers; + int max_per_plan_workers; + + /* + * log2(number_of_subpaths)+1 formula seems to give an appropriate number of + * workers for Append path either having high number of children (> 100) or + * having all non-partial subpaths or subpaths with 1-2 parallel_workers. + * Whereas, if the subpaths->parallel_workers is high, this formula is not + * suitable, because it does not take into account per-subpath workers. + * For e.g., with 3 subplans having per-subplan workers such as (2, 8, 8), + * the Append workers should be at least 8, whereas the formula gives 2. In + * this case, it seems better to follow the method used for calculating + * parallel_workers of an unpartitioned table : log3(table_size). So we + * treat a partitioned table as if the data belongs to a single + * unpartitioned table, and then derive its workers. So it will be : + * logb(b^w1 + b^w2 + b^w3) where w1, w2.. are per-subplan workers and + * b is some logarithmic base such as 2 or 3. It turns out that this + * evaluates to a value just a bit greater than max(w1,w2, w3). So, we + * just use the maximum of workers formula. But this formula gives too few + * workers when all paths have single worker (meaning they are non-partial) + * For e.g. with workers : (1, 1, 1, 1, 1, 1), it is better to allocate 3 + * workers, whereas this method allocates only 1. + * So we use whichever method that gives higher number of workers. + */ + + /* Get log2(num_subpaths) */ + log2w = fls(list_length(partial_subpaths) + + list_length(nonpartial_subpaths)); + + /* Avoid further calculations if we already crossed max workers limit */ + if (max_parallel_workers_per_gather <= log2w + 1) + return max_parallel_workers_per_gather; + + + /* + * Get the parallel_workers value of the partial subpath having the highest + * parallel_workers. + */ + max_per_plan_workers = 1; + foreach(lc, partial_subpaths) + { + Path *subpath = lfirst(lc); + max_per_plan_workers = Max(max_per_plan_workers, + subpath->parallel_workers); + } + + /* Choose the higher of the results of the two formulae */ + num_workers = rint(Max(log2w, max_per_plan_workers) + 1); + + /* In no case use more than max_parallel_workers_per_gather workers. */ + num_workers = Min(num_workers, max_parallel_workers_per_gather); + + return num_workers; +} + +/* * create_append_path * Creates a path corresponding to an Append plan, returning the * pathnode. @@ -1200,8 +1265,11 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, * Note that we must handle subpaths = NIL, representing a dummy access path. */ AppendPath * -create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, - int parallel_workers, List *partitioned_rels) +create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels) { AppendPath *pathnode = makeNode(AppendPath); ListCell *l; @@ -1211,43 +1279,50 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_appendrel_parampathinfo(rel, required_outer); - pathnode->path.parallel_aware = false; + pathnode->path.parallel_aware = (parallel_aware && parallel_workers > 0); pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* result is always considered unsorted */ pathnode->partitioned_rels = list_copy(partitioned_rels); - pathnode->subpaths = subpaths; - /* - * We don't bother with inventing a cost_append(), but just do it here. - * - * Compute rows and costs as sums of subplan rows and costs. We charge - * nothing extra for the Append itself, which perhaps is too optimistic, - * but since it doesn't do any selection or projection, it is a pretty - * cheap node. - */ - pathnode->path.rows = 0; - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; + /* For parallel append, non-partial paths are sorted by descending costs */ + if (pathnode->path.parallel_aware) + subpaths = list_qsort(subpaths, append_total_cost_compare); + + pathnode->first_partial_path = list_length(subpaths); + pathnode->subpaths = list_concat(subpaths, partial_subpaths); + foreach(l, subpaths) { Path *subpath = (Path *) lfirst(l); - pathnode->path.rows += subpath->rows; - - if (l == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; pathnode->path.parallel_safe = pathnode->path.parallel_safe && - subpath->parallel_safe; + subpath->parallel_safe; /* All child paths must have same parameterization */ Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); } + cost_append(&pathnode->path, pathnode->subpaths, + pathnode->first_partial_path); + return pathnode; } +static int +append_total_cost_compare(const void *a, const void *b) +{ + Path *path1 = (Path *) lfirst(*(ListCell **) a); + Path *path2 = (Path *) lfirst(*(ListCell **) b); + + if (path1->total_cost > path2->total_cost) + return -1; + if (path1->total_cost < path2->total_cost) + return 1; + + return 0; +} + /* * create_merge_append_path * Creates a path corresponding to a MergeAppend plan, returning the diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 82a1cf5..f2770fa 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -494,7 +494,7 @@ RegisterLWLockTranches(void) if (LWLockTrancheArray == NULL) { - LWLockTranchesAllocated = 64; + LWLockTranchesAllocated = 128; LWLockTrancheArray = (char **) MemoryContextAllocZero(TopMemoryContext, LWLockTranchesAllocated * sizeof(char *)); @@ -511,6 +511,7 @@ RegisterLWLockTranches(void) LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA, "parallel_query_dsa"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 246fea8..0782aa3 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -910,6 +910,15 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_parallelappend", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of parallel append plans."), + NULL + }, + &enable_parallelappend, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index df5d2f3..0a079b2 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -297,6 +297,7 @@ #enable_material = on #enable_mergejoin = on #enable_nestloop = on +#enable_parallelappend = on #enable_seqscan = on #enable_sort = on #enable_tidscan = on diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index 4e38a13..7d9e881 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -14,10 +14,14 @@ #ifndef NODEAPPEND_H #define NODEAPPEND_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags); extern void ExecEndAppend(AppendState *node); extern void ExecReScanAppend(AppendState *node); +extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeWorker(AppendState *node, shm_toc *toc); #endif /* NODEAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6cf128a..2642502 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -21,6 +21,7 @@ #include "lib/pairingheap.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/spin.h" #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" @@ -995,12 +996,15 @@ typedef struct ModifyTableState * whichplan which plan is being executed (0 .. n-1) * ---------------- */ +struct ParallelAppendDescData; typedef struct AppendState { PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; + struct ParallelAppendDescData *as_padesc; /* parallel coordination info */ + Size pappend_len; /* size of parallel coordination info */ } AppendState; /* ---------------- diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index 667d5e2..711db92 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -269,6 +269,9 @@ extern void list_free_deep(List *list); extern List *list_copy(const List *list); extern List *list_copy_tail(const List *list, int nskip); +typedef int (*list_qsort_comparator) (const void *a, const void *b); +extern List *list_qsort(const List *list, list_qsort_comparator cmp); + /* * To ease migration to the new list API, a set of compatibility * macros are provided that reduce the impact of the list API changes diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index a382331..1678497 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -248,6 +248,7 @@ typedef struct Append /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *appendplans; + int first_partial_plan; } Append; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index a39e59d..bdf1152 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1167,10 +1167,14 @@ typedef struct CustomPath * AppendPath represents an Append plan, ie, successive execution of * several member plans. * + * For partial Append, 'subpaths' contains non-partial subpaths followed by + * partial subpaths. + * * Note: it is possible for "subpaths" to contain only one, or even no, * elements. These cases are optimized during create_append_plan. * In particular, an AppendPath with no subpaths is a "dummy" path that * is created to represent the case that a relation is provably empty. + * */ typedef struct AppendPath { @@ -1178,6 +1182,9 @@ typedef struct AppendPath /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *subpaths; /* list of component Paths */ + + /* Index of first partial path in subpaths */ + int first_partial_path; } AppendPath; #define IS_DUMMY_PATH(p) \ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 63feba0..8e66cf0 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -67,6 +67,7 @@ extern bool enable_material; extern bool enable_mergejoin; extern bool enable_hashjoin; extern bool enable_gathermerge; +extern bool enable_parallelappend; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -105,6 +106,8 @@ extern void cost_sort(Path *path, PlannerInfo *root, List *pathkeys, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples); +extern void cost_append(Path *path, List *subpaths, + int num_nonpartial_subpaths); extern void cost_merge_append(Path *path, PlannerInfo *root, List *pathkeys, int n_streams, Cost input_startup_cost, Cost input_total_cost, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index e372f88..d578a5d 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -14,6 +14,7 @@ #ifndef PATHNODE_H #define PATHNODE_H +#include "nodes/bitmapset.h" #include "nodes/relation.h" @@ -63,9 +64,13 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root, List *bitmapquals); extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, Relids required_outer); -extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths, - Relids required_outer, int parallel_workers, - List *partitioned_rels); +extern int get_append_num_workers(List *partial_subpaths, + List *nonpartial_subpaths); +extern AppendPath *create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels); extern MergeAppendPath *create_merge_append_path(PlannerInfo *root, RelOptInfo *rel, List *subpaths, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 3d16132..35adf12 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -213,6 +213,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PREDICATE_LOCK_MANAGER, LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_TBM, + LWTRANCHE_PARALLEL_APPEND, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out index 1fa9650..7a5b3c7 100644 --- a/src/test/regress/expected/inherit.out +++ b/src/test/regress/expected/inherit.out @@ -1382,6 +1382,7 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallelappend = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; QUERY PLAN ------------------------------------------------------------------ @@ -1448,6 +1449,7 @@ select min(1-id) from matest0; (1 row) reset enable_seqscan; +reset enable_parallelappend; drop table matest0 cascade; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table matest1 diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 888da5a..daf00c2 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -11,15 +11,16 @@ set parallel_setup_cost=0; set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; +-- test Parallel Append. explain (costs off) - select count(*) from a_star; + select round(avg(aa)), sum(aa) from a_star; QUERY PLAN ----------------------------------------------------- Finalize Aggregate -> Gather - Workers Planned: 1 + Workers Planned: 4 -> Partial Aggregate - -> Append + -> Parallel Append -> Parallel Seq Scan on a_star -> Parallel Seq Scan on b_star -> Parallel Seq Scan on c_star @@ -28,12 +29,40 @@ explain (costs off) -> Parallel Seq Scan on f_star (11 rows) -select count(*) from a_star; - count -------- - 50 +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +-- Mix of partial and non-partial subplans. +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +----------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 4 + -> Partial Aggregate + -> Parallel Append + -> Seq Scan on d_star + -> Seq Scan on c_star + -> Parallel Seq Scan on a_star + -> Parallel Seq Scan on b_star + -> Parallel Seq Scan on e_star + -> Parallel Seq Scan on f_star +(11 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 (1 row) +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); -- test that parallel_restricted function doesn't run in worker alter table tenk1 set (parallel_workers = 4); explain (verbose, costs off) diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 568b783..97a9843 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -70,21 +70,22 @@ select count(*) >= 0 as ok from pg_prepared_xacts; -- This is to record the prevailing planner enable_foo settings during -- a regression test run. select name, setting from pg_settings where name like 'enable%'; - name | setting -----------------------+--------- - enable_bitmapscan | on - enable_gathermerge | on - enable_hashagg | on - enable_hashjoin | on - enable_indexonlyscan | on - enable_indexscan | on - enable_material | on - enable_mergejoin | on - enable_nestloop | on - enable_seqscan | on - enable_sort | on - enable_tidscan | on -(12 rows) + name | setting +-----------------------+--------- + enable_bitmapscan | on + enable_gathermerge | on + enable_hashagg | on + enable_hashjoin | on + enable_indexonlyscan | on + enable_indexscan | on + enable_material | on + enable_mergejoin | on + enable_nestloop | on + enable_parallelappend | on + enable_seqscan | on + enable_sort | on + enable_tidscan | on +(13 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql index c96580c..60ac387 100644 --- a/src/test/regress/sql/inherit.sql +++ b/src/test/regress/sql/inherit.sql @@ -491,11 +491,13 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallelappend = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; select * from matest0 order by 1-id; explain (verbose, costs off) select min(1-id) from matest0; select min(1-id) from matest0; reset enable_seqscan; +reset enable_parallelappend; drop table matest0 cascade; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index cefb5a2..5aafc4d 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -15,9 +15,18 @@ set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; +-- test Parallel Append. explain (costs off) - select count(*) from a_star; -select count(*) from a_star; + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +-- Mix of partial and non-partial subplans. +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); -- test that parallel_restricted function doesn't run in worker alter table tenk1 set (parallel_workers = 4);