diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 2b6255e..2ad1397 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -3679,6 +3679,20 @@ ANY num_sync (
+ enable_parallelappend (boolean)
+
+ enable_parallelappend> configuration parameter
+
+
+
+
+ Enables or disables the query planner's use of parallel-aware
+ append plan types. The default is on>.
+
+
+
+
enable_seqscan (boolean)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 5575c2c..b2fa245 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -845,7 +845,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
- LWLock>
+ LWLock>
ShmemIndexLock>
Waiting to find or allocate space in shared memory.
@@ -1109,6 +1109,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
Waiting for TBM shared iterator lock.
+ parallel_append>
+ Waiting to choose the next subplan during Parallel Append plan
+ execution.
+
+
Lock>
relation>
Waiting to acquire a lock on a relation.
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index c713b85..df0acdb 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
@@ -244,6 +245,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendEstimate((AppendState *) planstate,
+ e->pcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanEstimate((CustomScanState *) planstate,
@@ -316,6 +322,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendInitializeDSM((AppendState *) planstate,
+ d->pcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
@@ -908,6 +919,10 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
toc);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendInitializeWorker((AppendState *) planstate, toc);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index bed9bb8..11f9688 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,10 +60,46 @@
#include "executor/execdebug.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "optimizer/cost.h"
+#include "storage/spin.h"
-static TupleTableSlot *ExecAppend(PlanState *pstate);
-static bool exec_append_initialize_next(AppendState *appendstate);
+/*
+ * Shared state for Parallel Append.
+ *
+ * Each backend participating in a Parallel Append has its own
+ * descriptor in backend-private memory, and those objects all contain
+ * a pointer to this structure.
+ */
+typedef struct ParallelAppendDescData
+{
+ LWLock pa_lock; /* mutual exclusion to choose next subplan */
+ int pa_first_plan; /* plan to choose while wrapping around plans */
+ int pa_next_plan; /* next plan to choose by any worker */
+
+ /*
+ * pa_finished : workers currently executing the subplan. A worker which
+ * finishes a subplan should set pa_finished to true, so that no new
+ * worker picks this subplan. For non-partial subplan, a worker which picks
+ * up that subplan should immediately set to true, so as to make sure
+ * there are no more than 1 worker assigned to this subplan.
+ */
+ bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
+} ParallelAppendDescData;
+
+typedef ParallelAppendDescData *ParallelAppendDesc;
+
+/*
+ * Special value of AppendState->as_whichplan for Parallel Append, which
+ * indicates there are no plans left to be executed.
+ */
+#define PA_INVALID_PLAN -1
+static TupleTableSlot *ExecAppend(PlanState *pstate);
+static bool exec_append_seq_next(AppendState *appendstate);
+static bool exec_append_parallel_next(AppendState *state);
+static bool exec_append_leader_next(AppendState *state);
+static int exec_append_get_next_plan(int curplan, int first_plan,
+ int last_plan);
/* ----------------------------------------------------------------
* exec_append_initialize_next
@@ -74,11 +110,20 @@ static bool exec_append_initialize_next(AppendState *appendstate);
* ----------------------------------------------------------------
*/
static bool
-exec_append_initialize_next(AppendState *appendstate)
+exec_append_seq_next(AppendState *appendstate)
{
int whichplan;
/*
+ * Not parallel-aware. Fine, just go on to the next subplan in the
+ * appropriate direction.
+ */
+ if (ScanDirectionIsForward(appendstate->ps.state->es_direction))
+ appendstate->as_whichplan++;
+ else
+ appendstate->as_whichplan--;
+
+ /*
* get information from the append node
*/
whichplan = appendstate->as_whichplan;
@@ -185,10 +230,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->ps.ps_ProjInfo = NULL;
/*
- * initialize to scan first subplan
+ * Initialize to scan first subplan (but note that we'll override this
+ * later in the case of a parallel append).
*/
appendstate->as_whichplan = 0;
- exec_append_initialize_next(appendstate);
return appendstate;
}
@@ -204,6 +249,14 @@ ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ /*
+ * Check if we are already finished plans from parallel append. This
+ * can happen if all the subplans are finished when this worker
+ * has not even started returning tuples.
+ */
+ if (node->as_padesc && node->as_whichplan == PA_INVALID_PLAN)
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
for (;;)
{
PlanState *subnode;
@@ -232,16 +285,31 @@ ExecAppend(PlanState *pstate)
}
/*
- * Go on to the "next" subplan in the appropriate direction. If no
- * more subplans, return the empty slot set up for us by
- * ExecInitAppend.
+ * Go on to the "next" subplan. If no more subplans, return the empty
+ * slot set up for us by ExecInitAppend.
*/
- if (ScanDirectionIsForward(node->ps.state->es_direction))
- node->as_whichplan++;
+ if (!node->as_padesc)
+ {
+ /*
+ * This is Parallel-aware append. Follow it's own logic of choosing
+ * the next subplan.
+ */
+ if (!exec_append_seq_next(node))
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
else
- node->as_whichplan--;
- if (!exec_append_initialize_next(node))
- return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ {
+ /*
+ * We are done with this subplan. There might be other workers
+ * still processing the last chunk of rows for this same subplan,
+ * but there's no point for new workers to run this subplan, so
+ * mark this subplan as finished.
+ */
+ node->as_padesc->pa_finished[node->as_whichplan] = true;
+
+ if (!exec_append_parallel_next(node))
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
/* Else loop back and try to get a tuple from the new subplan */
}
@@ -279,6 +347,7 @@ void
ExecReScanAppend(AppendState *node)
{
int i;
+ ParallelAppendDesc padesc = node->as_padesc;
for (i = 0; i < node->as_nplans; i++)
{
@@ -298,6 +367,276 @@ ExecReScanAppend(AppendState *node)
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
+
+ if (padesc)
+ {
+ padesc->pa_first_plan = padesc->pa_next_plan = 0;
+ memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans);
+ }
+
node->as_whichplan = 0;
- exec_append_initialize_next(node);
+}
+
+/* ----------------------------------------------------------------
+ * Parallel Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecAppendEstimate
+ *
+ * estimates the space required to serialize Append node.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendEstimate(AppendState *node,
+ ParallelContext *pcxt)
+{
+ node->pappend_len =
+ add_size(offsetof(struct ParallelAppendDescData, pa_finished),
+ sizeof(bool) * node->as_nplans);
+
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pappend_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+
+/* ----------------------------------------------------------------
+ * ExecAppendInitializeDSM
+ *
+ * Set up a Parallel Append descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeDSM(AppendState *node,
+ ParallelContext *pcxt)
+{
+ ParallelAppendDesc padesc;
+
+ padesc = shm_toc_allocate(pcxt->toc, node->pappend_len);
+
+ /*
+ * Just setting all the fields to 0 is enough. The logic of choosing the
+ * next plan in workers will take care of everything else.
+ */
+ memset(padesc, 0, sizeof(ParallelAppendDescData));
+ memset(padesc->pa_finished, 0, sizeof(bool) * node->as_nplans);
+
+ LWLockInitialize(&padesc->pa_lock, LWTRANCHE_PARALLEL_APPEND);
+
+ node->as_padesc = padesc;
+
+ shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, padesc);
+
+ /* Choose the first subplan to be executed. */
+ (void) exec_append_parallel_next(node);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate, and initialize
+ * whatever is required to choose and execute the optimal subplan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeWorker(AppendState *node, shm_toc *toc)
+{
+ node->as_padesc = shm_toc_lookup(toc, node->ps.plan->plan_node_id, false);
+
+ /* Choose the first subplan to be executed. */
+ (void) exec_append_parallel_next(node);
+}
+
+/* ----------------------------------------------------------------
+ * exec_append_parallel_next
+ *
+ * Determine the next subplan that should be executed. Each worker uses a
+ * shared next_subplan counter index to start looking for unfinished plan,
+ * executes the subplan, then shifts ahead this counter to the next
+ * subplan, so that other workers know which next plan to choose. This
+ * way, workers choose the subplans in round robin order, and thus they
+ * get evenly distributed among the subplans.
+ *
+ * Returns false if and only if all subplans are already finished
+ * processing.
+ * ----------------------------------------------------------------
+ */
+static bool
+exec_append_parallel_next(AppendState *state)
+{
+ ParallelAppendDesc padesc = state->as_padesc;
+ int whichplan;
+ int initial_plan;
+ int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan;
+ bool found;
+
+ Assert(padesc != NULL);
+
+ /* Backward scan is not supported by parallel-aware plans */
+ Assert(ScanDirectionIsForward(state->ps.state->es_direction));
+
+ /* The parallel leader chooses its next subplan differently */
+ if (!IsParallelWorker())
+ return exec_append_leader_next(state);
+
+ LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE);
+
+ /* Make a note of which subplan we have started with */
+ initial_plan = padesc->pa_next_plan;
+
+ /*
+ * Keep going to the next plan until we find an unfinished one. In the
+ * process, also keep track of the first unfinished non-partial subplan. As
+ * the non-partial subplans are taken one by one, the first unfinished
+ * subplan index will shift ahead, so that we don't have to visit the
+ * finished non-partial ones anymore.
+ */
+
+ found = false;
+ for (whichplan = initial_plan; whichplan != PA_INVALID_PLAN;)
+ {
+ /*
+ * Ignore plans that are already done processing. These also include
+ * non-partial subplans which have already been taken by a worker.
+ */
+ if (!padesc->pa_finished[whichplan])
+ {
+ found = true;
+ break;
+ }
+
+ /*
+ * Note: There is a chance that just after the child plan node is
+ * chosen above, some other worker finishes this node and sets
+ * pa_finished to true. In that case, this worker will go ahead and
+ * call ExecProcNode(child_node), which will return NULL tuple since it
+ * is already finished, and then once again this worker will try to
+ * choose next subplan; but this is ok : it's just an extra
+ * "choose_next_subplan" operation.
+ */
+
+ /* Either go to the next plan, or wrap around to the first one */
+ whichplan = exec_append_get_next_plan(whichplan, padesc->pa_first_plan,
+ state->as_nplans - 1);
+
+ /*
+ * If we have wrapped around and returned to the same index again, we
+ * are done scanning.
+ */
+ if (whichplan == initial_plan)
+ break;
+ }
+
+ if (!found)
+ {
+ /*
+ * We didn't find any plan to execute, stop executing, and indicate
+ * the same for other workers to know that there is no next plan.
+ */
+ padesc->pa_next_plan = state->as_whichplan = PA_INVALID_PLAN;
+ }
+ else
+ {
+ /*
+ * If this a non-partial plan, immediately mark it finished, and shift
+ * ahead pa_first_plan.
+ */
+ if (whichplan < first_partial_plan)
+ {
+ padesc->pa_finished[whichplan] = true;
+ padesc->pa_first_plan = whichplan + 1;
+ }
+
+ /*
+ * Set the chosen plan, and the next plan to be picked by other
+ * workers.
+ */
+ state->as_whichplan = whichplan;
+ padesc->pa_next_plan = exec_append_get_next_plan(whichplan,
+ padesc->pa_first_plan,
+ state->as_nplans - 1);
+ }
+
+ LWLockRelease(&padesc->pa_lock);
+
+ return found;
+}
+
+/* ----------------------------------------------------------------
+ * exec_append_leader_next
+ *
+ * To be used only if it's a parallel leader. The backend should scan
+ * backwards from the last plan. This is to prevent it from taking up
+ * the most expensive non-partial plan, i.e. the first subplan.
+ * ----------------------------------------------------------------
+ */
+static bool
+exec_append_leader_next(AppendState *state)
+{
+ ParallelAppendDesc padesc = state->as_padesc;
+ int first_plan;
+ int whichplan;
+ int first_partial_plan = ((Append *)state->ps.plan)->first_partial_plan;
+
+ LWLockAcquire(&padesc->pa_lock, LW_EXCLUSIVE);
+
+ /* The parallel leader should start from the last subplan. */
+ first_plan = padesc->pa_first_plan;
+
+ for (whichplan = state->as_nplans - 1; whichplan >= first_plan;
+ whichplan--)
+ {
+ if (!padesc->pa_finished[whichplan])
+ {
+ /* If this a non-partial plan, immediately mark it finished */
+ if (whichplan < first_partial_plan)
+ padesc->pa_finished[whichplan] = true;
+
+ break;
+ }
+ }
+
+ LWLockRelease(&padesc->pa_lock);
+
+ /* Return false only if we didn't find any plan to execute */
+ if (whichplan < first_plan)
+ {
+ state->as_whichplan = PA_INVALID_PLAN;
+ return false;
+ }
+ else
+ {
+ state->as_whichplan = whichplan;
+ return true;
+ }
+}
+
+/* ----------------------------------------------------------------
+ * exec_append_get_next_plan
+ *
+ * Either go to the next index, or wrap around to the first unfinished one.
+ * Returns this next index. While wrapping around, if the first unfinished
+ * one itself is past the last plan, returns PA_INVALID_PLAN.
+ * ----------------------------------------------------------------
+ */
+static int
+exec_append_get_next_plan(int curplan, int first_plan, int last_plan)
+{
+ Assert(curplan <= last_plan);
+
+ if (curplan < last_plan)
+ return curplan + 1;
+ else
+ {
+ /*
+ * We are already at the last plan. If the first_plan itsef is the last
+ * plan or if it is past the last plan, that means there is no next
+ * plan remaining. Return Invalid.
+ */
+ if (first_plan >= last_plan)
+ return PA_INVALID_PLAN;
+
+ return first_plan;
+ }
}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index f9ddf4e..1b8a7d1 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -242,6 +242,7 @@ _copyAppend(const Append *from)
*/
COPY_NODE_FIELD(partitioned_rels);
COPY_NODE_FIELD(appendplans);
+ COPY_SCALAR_FIELD(first_partial_plan);
return newnode;
}
diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c
index acaf4b5..75761a9 100644
--- a/src/backend/nodes/list.c
+++ b/src/backend/nodes/list.c
@@ -1250,6 +1250,45 @@ list_copy_tail(const List *oldlist, int nskip)
}
/*
+ * Sort a list using qsort. A sorted list is built but the cells of the original
+ * list are re-used. Caller has to pass a copy of the list if the original list
+ * needs to be untouched. Effectively, the comparator function is passed
+ * pointers to ListCell* pointers.
+ */
+List *
+list_qsort(const List *list, list_qsort_comparator cmp)
+{
+ ListCell *cell;
+ int i;
+ int len = list_length(list);
+ ListCell **list_arr;
+ List *new_list;
+
+ if (len == 0)
+ return NIL;
+
+ i = 0;
+ list_arr = palloc(sizeof(ListCell *) * len);
+ foreach(cell, list)
+ list_arr[i++] = cell;
+
+ qsort(list_arr, len, sizeof(ListCell *), cmp);
+
+ new_list = (List *) palloc(sizeof(List));
+ new_list->type = T_List;
+ new_list->length = len;
+ new_list->head = list_arr[0];
+ new_list->tail = list_arr[len-1];
+
+ for (i = 0; i < len-1; i++)
+ list_arr[i]->next = list_arr[i+1];
+
+ list_arr[len-1]->next = NULL;
+ pfree(list_arr);
+ return new_list;
+}
+
+/*
* Temporary compatibility functions
*
* In order to avoid warnings for these function definitions, we need
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 9ee3e23..b1d8bc7 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -394,6 +394,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_NODE_FIELD(partitioned_rels);
WRITE_NODE_FIELD(appendplans);
+ WRITE_INT_FIELD(first_partial_plan);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 67b9e19..a25213f 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1594,6 +1594,7 @@ _readAppend(void)
READ_NODE_FIELD(partitioned_rels);
READ_NODE_FIELD(appendplans);
+ READ_INT_FIELD(first_partial_plan);
READ_DONE();
}
diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c
index 2d7e1d8..5f58fe1 100644
--- a/src/backend/optimizer/path/allpaths.c
+++ b/src/backend/optimizer/path/allpaths.c
@@ -101,6 +101,9 @@ static Path *get_cheapest_parameterized_child_path(PlannerInfo *root,
RelOptInfo *rel,
Relids required_outer);
static List *accumulate_append_subpath(List *subpaths, Path *path);
+static List *accumulate_partialappend_subpath(List *partial_subpaths,
+ Path *subpath, bool is_partial,
+ List **nonpartial_subpaths);
static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel,
Index rti, RangeTblEntry *rte);
static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel,
@@ -1281,7 +1284,11 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
List *subpaths = NIL;
bool subpaths_valid = true;
List *partial_subpaths = NIL;
+ List *pa_partial_subpaths = NIL;
+ List *pa_nonpartial_subpaths = NIL;
bool partial_subpaths_valid = true;
+ bool pa_subpaths_valid = enable_parallelappend;
+ bool pa_all_partial_subpaths = enable_parallelappend;
List *all_child_pathkeys = NIL;
List *all_child_outers = NIL;
ListCell *l;
@@ -1317,7 +1324,65 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
else
subpaths_valid = false;
- /* Same idea, but for a partial plan. */
+ /* Same idea, but for a parallel append path. */
+ if (pa_subpaths_valid && enable_parallelappend)
+ {
+ Path *chosen_path = NULL;
+ Path *cheapest_partial_path = NULL;
+ Path *cheapest_parallel_safe_path = NULL;
+
+ /*
+ * Extract the cheapest unparameterized, parallel-safe one among
+ * the child paths.
+ */
+ cheapest_parallel_safe_path =
+ get_cheapest_parallel_safe_total_inner(childrel->pathlist);
+
+ /* Get the cheapest partial path */
+ if (childrel->partial_pathlist != NIL)
+ cheapest_partial_path = linitial(childrel->partial_pathlist);
+
+ if (!cheapest_parallel_safe_path && !cheapest_partial_path)
+ {
+ /*
+ * This child rel neither has a partial path, nor has a
+ * parallel-safe path. Drop the idea for parallel append.
+ */
+ pa_subpaths_valid = false;
+ }
+ else if (cheapest_partial_path && cheapest_parallel_safe_path)
+ {
+ /* Both are valid. Choose the cheaper out of the two */
+ if (cheapest_parallel_safe_path->total_cost <
+ cheapest_partial_path->total_cost)
+ chosen_path = cheapest_parallel_safe_path;
+ else
+ chosen_path = cheapest_partial_path;
+ }
+ else
+ {
+ /* Either one is valid. Choose the valid one */
+ chosen_path = cheapest_partial_path ?
+ cheapest_partial_path :
+ cheapest_parallel_safe_path;
+ }
+
+ /* If we got a valid path, add it */
+ if (chosen_path)
+ {
+ pa_partial_subpaths =
+ accumulate_partialappend_subpath(
+ pa_partial_subpaths,
+ chosen_path,
+ chosen_path == cheapest_partial_path,
+ &pa_nonpartial_subpaths);
+ }
+
+ if (chosen_path && chosen_path != cheapest_partial_path)
+ pa_all_partial_subpaths = false;
+ }
+
+ /* Same idea, but for a non-parallel partial plan. */
if (childrel->partial_pathlist != NIL)
partial_subpaths = accumulate_append_subpath(partial_subpaths,
linitial(childrel->partial_pathlist));
@@ -1395,23 +1460,39 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
* if we have zero or one live subpath due to constraint exclusion.)
*/
if (subpaths_valid)
- add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0,
+ add_path(rel, (Path *) create_append_path(rel, subpaths, NIL,
+ NULL, 0, false,
partitioned_rels));
+ /* Consider parallel append path. */
+ if (pa_subpaths_valid)
+ {
+ AppendPath *appendpath;
+ int parallel_workers;
+
+ parallel_workers = get_append_num_workers(pa_partial_subpaths,
+ pa_nonpartial_subpaths);
+ appendpath = create_append_path(rel, pa_nonpartial_subpaths,
+ pa_partial_subpaths,
+ NULL, parallel_workers, true,
+ partitioned_rels);
+ add_partial_path(rel, (Path *) appendpath);
+ }
+
/*
- * Consider an append of partial unordered, unparameterized partial paths.
+ * Consider non-parallel partial append path. But if the parallel append
+ * path is made out of all partial subpaths, don't create another partial
+ * path; we will keep only the parallel append path in that case.
*/
- if (partial_subpaths_valid)
+ if (partial_subpaths_valid && !pa_all_partial_subpaths)
{
AppendPath *appendpath;
ListCell *lc;
int parallel_workers = 0;
/*
- * Decide on the number of workers to request for this append path.
- * For now, we just use the maximum value from among the members. It
- * might be useful to use a higher number if the Append node were
- * smart enough to spread out the workers, but it currently isn't.
+ * To decide the number of workers, just use the maximum value from
+ * among the children.
*/
foreach(lc, partial_subpaths)
{
@@ -1421,9 +1502,9 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
}
Assert(parallel_workers > 0);
- /* Generate a partial append path. */
- appendpath = create_append_path(rel, partial_subpaths, NULL,
- parallel_workers, partitioned_rels);
+ appendpath = create_append_path(rel, NIL, partial_subpaths,
+ NULL, parallel_workers, false,
+ partitioned_rels);
add_partial_path(rel, (Path *) appendpath);
}
@@ -1476,7 +1557,8 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel,
if (subpaths_valid)
add_path(rel, (Path *)
- create_append_path(rel, subpaths, required_outer, 0,
+ create_append_path(rel, subpaths, NIL,
+ required_outer, 0, false,
partitioned_rels));
}
}
@@ -1694,6 +1776,78 @@ accumulate_append_subpath(List *subpaths, Path *path)
}
/*
+ * accumulate_partialappend_subpath:
+ * Add a subpath to the list being built for a partial Append.
+ *
+ * This is same as accumulate_append_subpath, except that two separate lists
+ * are created, one containing only partial subpaths, and the other containing
+ * only non-partial subpaths. Also, the non-partial paths are kept ordered
+ * by descending total cost.
+ *
+ * is_partial is true if the subpath being added is a partial subpath.
+ */
+static List *
+accumulate_partialappend_subpath(List *partial_subpaths,
+ Path *subpath, bool is_partial,
+ List **nonpartial_subpaths)
+{
+ /* list_copy is important here to avoid sharing list substructure */
+
+ if (IsA(subpath, AppendPath))
+ {
+ AppendPath *apath = (AppendPath *) subpath;
+ List *apath_partial_paths;
+ List *apath_nonpartial_paths;
+
+ /* Split the Append subpaths into partial and non-partial paths */
+ apath_nonpartial_paths = list_truncate(list_copy(apath->subpaths),
+ apath->first_partial_path);
+ apath_partial_paths = list_copy_tail(apath->subpaths,
+ apath->first_partial_path);
+
+ /* Add non-partial subpaths, if any. */
+ *nonpartial_subpaths = list_concat(*nonpartial_subpaths,
+ list_copy(apath_nonpartial_paths));
+
+ /* Add partial subpaths, if any. */
+ return list_concat(partial_subpaths, apath_partial_paths);
+ }
+ else if (IsA(subpath, MergeAppendPath))
+ {
+ MergeAppendPath *mpath = (MergeAppendPath *) subpath;
+
+ /*
+ * If at all MergeAppend is partial, all its child plans have to be
+ * partial : we don't currently support a mix of partial and
+ * non-partial MergeAppend subpaths.
+ */
+ if (is_partial)
+ return list_concat(partial_subpaths, list_copy(mpath->subpaths));
+ else
+ {
+ /*
+ * Since MergePath itself is non-partial, treat all its subpaths
+ * non-partial.
+ */
+ *nonpartial_subpaths = list_concat(*nonpartial_subpaths,
+ list_copy(mpath->subpaths));
+ return partial_subpaths;
+ }
+ }
+ else
+ {
+ /* Just add it to the right list depending upon whether it's partial */
+ if (is_partial)
+ return lappend(partial_subpaths, subpath);
+ else
+ {
+ *nonpartial_subpaths = lappend(*nonpartial_subpaths, subpath);
+ return partial_subpaths;
+ }
+ }
+}
+
+/*
* set_dummy_rel_pathlist
* Build a dummy path for a relation that's been excluded by constraints
*
@@ -1713,7 +1867,8 @@ set_dummy_rel_pathlist(RelOptInfo *rel)
rel->pathlist = NIL;
rel->partial_pathlist = NIL;
- add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL));
+ add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL,
+ 0, false, NIL));
/*
* We set the cheapest path immediately, to ensure that IS_DUMMY_REL()
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 051a854..79c08aa 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -127,6 +127,7 @@ bool enable_material = true;
bool enable_mergejoin = true;
bool enable_hashjoin = true;
bool enable_gathermerge = true;
+bool enable_parallelappend = true;
typedef struct
{
@@ -159,6 +160,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root,
Relids inner_relids,
SpecialJoinInfo *sjinfo,
List **restrictlist);
+static Cost append_nonpartial_cost(List *subpaths, int numpaths,
+ int parallel_workers);
static void set_rel_width(PlannerInfo *root, RelOptInfo *rel);
static double relation_byte_size(double tuples, int width);
static double page_size(double tuples, int width);
@@ -1741,6 +1744,189 @@ cost_sort(Path *path, PlannerInfo *root,
}
/*
+ * append_nonpartial_cost
+ * Determines and returns the cost of non-partial paths of Append node.
+ *
+ * It is the total cost units taken by all the workers to finish all the
+ * non-partial subpaths.
+ * subpaths contains non-partial paths followed by partial paths.
+ * numpaths tells the number of non-partial paths.
+ */
+static Cost
+append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers)
+{
+ Cost *costarr;
+ int arrlen;
+ ListCell *l;
+ ListCell *cell;
+ int i;
+ int path_index;
+ int min_index;
+ int max_index;
+
+ if (numpaths == 0)
+ return 0;
+
+ /*
+ * Build the cost array containing costs of first n number of subpaths,
+ * where n = parallel_workers. Also, its size is kept only as long as the
+ * number of subpaths, or parallel_workers, whichever is minimum.
+ */
+ arrlen = Min(parallel_workers, numpaths);
+ costarr = (Cost *) palloc(sizeof(Cost) * arrlen);
+ path_index = 0;
+ foreach(cell, subpaths)
+ {
+ Path *subpath = (Path *) lfirst(cell);
+
+ if (path_index == arrlen)
+ break;
+ costarr[path_index++] = subpath->total_cost;
+ }
+
+ /*
+ * Since the subpaths are non-partial paths, the array is initially sorted
+ * by decreasing cost. So choose the last one for the index with minimum
+ * cost.
+ */
+ min_index = arrlen - 1;
+
+ /*
+ * For each of the remaining subpaths, add its cost to the array element
+ * with minimum cost.
+ */
+ for_each_cell(l, cell)
+ {
+ Path *subpath = (Path *) lfirst(l);
+ int i;
+
+ /* Consider only the non-partial paths */
+ if (path_index++ == numpaths)
+ break;
+
+ costarr[min_index] += subpath->total_cost;
+
+ /* Update the new min cost array index */
+ for (min_index = i = 0; i < arrlen; i++)
+ {
+ if (costarr[i] < costarr[min_index])
+ min_index = i;
+ }
+ }
+
+ /* Return the highest cost from the array */
+ for (max_index = i = 0; i < arrlen; i++)
+ {
+ if (costarr[i] > costarr[max_index])
+ max_index = i;
+ }
+
+ return costarr[max_index];
+}
+
+/*
+ * cost_append
+ * Determines and returns the cost of an Append node.
+ *
+ * We charge nothing extra for the Append itself, which perhaps is too
+ * optimistic, but since it doesn't do any selection or projection, it is a
+ * pretty cheap node.
+ */
+void
+cost_append(Path *path, List *subpaths, int num_nonpartial_subpaths)
+{
+ ListCell *l;
+
+ path->rows = 0;
+ path->startup_cost = 0;
+ path->total_cost = 0;
+
+ if (list_length(subpaths) == 0)
+ return;
+
+ if (!path->parallel_aware)
+ {
+ Path *subpath = (Path *) linitial(subpaths);
+
+ /*
+ * Startup cost of non-parallel-aware Append is the startup cost of
+ * first subpath.
+ */
+ path->startup_cost = subpath->startup_cost;
+
+ /* Compute rows and costs as sums of subplan rows and costs. */
+ foreach(l, subpaths)
+ {
+ Path *subpath = (Path *) lfirst(l);
+
+ path->rows += subpath->rows;
+ path->total_cost += subpath->total_cost;
+ }
+ }
+ else /* parallel-aware */
+ {
+ double max_rows = 0;
+ double nonpartial_rows = 0;
+ int i = 0;
+
+ /* Include the non-partial paths total cost */
+ path->total_cost += append_nonpartial_cost(subpaths,
+ num_nonpartial_subpaths,
+ path->parallel_workers);
+
+ /* Calculate startup cost; also add up all the rows for later use */
+ foreach(l, subpaths)
+ {
+ Path *subpath = (Path *) lfirst(l);
+
+ /*
+ * Append would start returning tuples when the child node having
+ * lowest startup cost is done setting up. We consider only the
+ * first few subplans that immediately get a worker assigned.
+ */
+ if (i < path->parallel_workers)
+ {
+ path->startup_cost = Min(path->startup_cost,
+ subpath->startup_cost);
+ }
+
+ if (i < num_nonpartial_subpaths)
+ {
+ nonpartial_rows += subpath->rows;
+
+ /* Also keep track of max rows for any given subpath */
+ max_rows = Max(max_rows, subpath->rows);
+ }
+
+ i++;
+ }
+
+ /*
+ * As an approximation, non-partial rows are calculated as total rows
+ * divided by number of workers. But if there are highly unequal number
+ * of rows across the paths, this figure might not reflect correctly.
+ * So we make a note that it also should not be less than the maximum
+ * of all the path rows.
+ */
+ nonpartial_rows /= path->parallel_workers;
+ path->rows += Max(nonpartial_rows, max_rows);
+
+ /* Calculate partial paths cost. */
+ if (list_length(subpaths) > num_nonpartial_subpaths)
+ {
+ /* Compute rows and costs as sums of subplan rows and costs. */
+ for_each_cell(l, list_nth_cell(subpaths, num_nonpartial_subpaths))
+ {
+ Path *subpath = (Path *) lfirst(l);
+
+ path->rows += subpath->rows;
+ path->total_cost += subpath->total_cost;
+ }
+ }
+ }
+}
+
+/*
* cost_merge_append
* Determines and returns the cost of a MergeAppend node.
*
diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c
index 6ee2350..0eee647 100644
--- a/src/backend/optimizer/path/joinrels.c
+++ b/src/backend/optimizer/path/joinrels.c
@@ -1217,7 +1217,8 @@ mark_dummy_rel(RelOptInfo *rel)
rel->partial_pathlist = NIL;
/* Set up the dummy path */
- add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL));
+ add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL,
+ 0, false, NIL));
/* Set or update cheapest_total_path and related fields */
set_cheapest(rel);
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 2821662..055ac64 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
Index scanrelid, char *enrname);
static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
Index scanrelid, int wtParam);
-static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels);
+static Append *make_append(List *appendplans, int first_partial_plan,
+ List *tlist, List *partitioned_rels);
static RecursiveUnion *make_recursive_union(List *tlist,
Plan *lefttree,
Plan *righttree,
@@ -1049,7 +1050,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
* parent-rel Vars it'll be asked to emit.
*/
- plan = make_append(subplans, tlist, best_path->partitioned_rels);
+ plan = make_append(subplans, best_path->first_partial_path,
+ tlist, best_path->partitioned_rels);
copy_generic_path_info(&plan->plan, (Path *) best_path);
@@ -5274,7 +5276,7 @@ make_foreignscan(List *qptlist,
}
static Append *
-make_append(List *appendplans, List *tlist, List *partitioned_rels)
+make_append(List *appendplans, int first_partial_plan, List *tlist, List *partitioned_rels)
{
Append *node = makeNode(Append);
Plan *plan = &node->plan;
@@ -5285,6 +5287,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels)
plan->righttree = NULL;
node->partitioned_rels = partitioned_rels;
node->appendplans = appendplans;
+ node->first_partial_plan = first_partial_plan;
return node;
}
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index 9662302..d9fab6b 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -3603,8 +3603,10 @@ create_grouping_paths(PlannerInfo *root,
path = (Path *)
create_append_path(grouped_rel,
paths,
+ NIL,
NULL,
0,
+ false,
NIL);
path->pathtarget = target;
}
diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c
index e73c819..4af5740 100644
--- a/src/backend/optimizer/prep/prepunion.c
+++ b/src/backend/optimizer/prep/prepunion.c
@@ -578,8 +578,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root,
/*
* Append the child results together.
*/
- path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL);
-
+ path = (Path *) create_append_path(result_rel, pathlist, NIL,
+ NULL, 0, false, NIL);
/* We have to manually jam the right tlist into the path; ick */
path->pathtarget = create_pathtarget(root, tlist);
@@ -690,7 +690,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root,
/*
* Append the child results together.
*/
- path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL);
+ path = (Path *) create_append_path(result_rel, pathlist, NIL,
+ NULL, 0, false, NIL);
/* We have to manually jam the right tlist into the path; ick */
path->pathtarget = create_pathtarget(root, tlist);
diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c
index 26567cb..846f33e 100644
--- a/src/backend/optimizer/util/pathnode.c
+++ b/src/backend/optimizer/util/pathnode.c
@@ -46,6 +46,7 @@ typedef enum
#define STD_FUZZ_FACTOR 1.01
static List *translate_sub_tlist(List *tlist, int relid);
+static int append_total_cost_compare(const void *a, const void *b);
/*****************************************************************************
@@ -1193,6 +1194,70 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
}
/*
+ * get_append_num_workers
+ * Return the number of workers to request for partial append path.
+ */
+int
+get_append_num_workers(List *partial_subpaths, List *nonpartial_subpaths)
+{
+ ListCell *lc;
+ double log2w;
+ int num_workers;
+ int max_per_plan_workers;
+
+ /*
+ * log2(number_of_subpaths)+1 formula seems to give an appropriate number of
+ * workers for Append path either having high number of children (> 100) or
+ * having all non-partial subpaths or subpaths with 1-2 parallel_workers.
+ * Whereas, if the subpaths->parallel_workers is high, this formula is not
+ * suitable, because it does not take into account per-subpath workers.
+ * For e.g., with 3 subplans having per-subplan workers such as (2, 8, 8),
+ * the Append workers should be at least 8, whereas the formula gives 2. In
+ * this case, it seems better to follow the method used for calculating
+ * parallel_workers of an unpartitioned table : log3(table_size). So we
+ * treat a partitioned table as if the data belongs to a single
+ * unpartitioned table, and then derive its workers. So it will be :
+ * logb(b^w1 + b^w2 + b^w3) where w1, w2.. are per-subplan workers and
+ * b is some logarithmic base such as 2 or 3. It turns out that this
+ * evaluates to a value just a bit greater than max(w1,w2, w3). So, we
+ * just use the maximum of workers formula. But this formula gives too few
+ * workers when all paths have single worker (meaning they are non-partial)
+ * For e.g. with workers : (1, 1, 1, 1, 1, 1), it is better to allocate 3
+ * workers, whereas this method allocates only 1.
+ * So we use whichever method that gives higher number of workers.
+ */
+
+ /* Get log2(num_subpaths) */
+ log2w = fls(list_length(partial_subpaths) +
+ list_length(nonpartial_subpaths));
+
+ /* Avoid further calculations if we already crossed max workers limit */
+ if (max_parallel_workers_per_gather <= log2w + 1)
+ return max_parallel_workers_per_gather;
+
+
+ /*
+ * Get the parallel_workers value of the partial subpath having the highest
+ * parallel_workers.
+ */
+ max_per_plan_workers = 1;
+ foreach(lc, partial_subpaths)
+ {
+ Path *subpath = lfirst(lc);
+ max_per_plan_workers = Max(max_per_plan_workers,
+ subpath->parallel_workers);
+ }
+
+ /* Choose the higher of the results of the two formulae */
+ num_workers = rint(Max(log2w, max_per_plan_workers) + 1);
+
+ /* In no case use more than max_parallel_workers_per_gather workers. */
+ num_workers = Min(num_workers, max_parallel_workers_per_gather);
+
+ return num_workers;
+}
+
+/*
* create_append_path
* Creates a path corresponding to an Append plan, returning the
* pathnode.
@@ -1200,8 +1265,11 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
* Note that we must handle subpaths = NIL, representing a dummy access path.
*/
AppendPath *
-create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
- int parallel_workers, List *partitioned_rels)
+create_append_path(RelOptInfo *rel,
+ List *subpaths, List *partial_subpaths,
+ Relids required_outer,
+ int parallel_workers, bool parallel_aware,
+ List *partitioned_rels)
{
AppendPath *pathnode = makeNode(AppendPath);
ListCell *l;
@@ -1211,43 +1279,50 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = get_appendrel_parampathinfo(rel,
required_outer);
- pathnode->path.parallel_aware = false;
+ pathnode->path.parallel_aware = (parallel_aware && parallel_workers > 0);
pathnode->path.parallel_safe = rel->consider_parallel;
pathnode->path.parallel_workers = parallel_workers;
pathnode->path.pathkeys = NIL; /* result is always considered unsorted */
pathnode->partitioned_rels = list_copy(partitioned_rels);
- pathnode->subpaths = subpaths;
- /*
- * We don't bother with inventing a cost_append(), but just do it here.
- *
- * Compute rows and costs as sums of subplan rows and costs. We charge
- * nothing extra for the Append itself, which perhaps is too optimistic,
- * but since it doesn't do any selection or projection, it is a pretty
- * cheap node.
- */
- pathnode->path.rows = 0;
- pathnode->path.startup_cost = 0;
- pathnode->path.total_cost = 0;
+ /* For parallel append, non-partial paths are sorted by descending costs */
+ if (pathnode->path.parallel_aware)
+ subpaths = list_qsort(subpaths, append_total_cost_compare);
+
+ pathnode->first_partial_path = list_length(subpaths);
+ pathnode->subpaths = list_concat(subpaths, partial_subpaths);
+
foreach(l, subpaths)
{
Path *subpath = (Path *) lfirst(l);
- pathnode->path.rows += subpath->rows;
-
- if (l == list_head(subpaths)) /* first node? */
- pathnode->path.startup_cost = subpath->startup_cost;
- pathnode->path.total_cost += subpath->total_cost;
pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
- subpath->parallel_safe;
+ subpath->parallel_safe;
/* All child paths must have same parameterization */
Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));
}
+ cost_append(&pathnode->path, pathnode->subpaths,
+ pathnode->first_partial_path);
+
return pathnode;
}
+static int
+append_total_cost_compare(const void *a, const void *b)
+{
+ Path *path1 = (Path *) lfirst(*(ListCell **) a);
+ Path *path2 = (Path *) lfirst(*(ListCell **) b);
+
+ if (path1->total_cost > path2->total_cost)
+ return -1;
+ if (path1->total_cost < path2->total_cost)
+ return 1;
+
+ return 0;
+}
+
/*
* create_merge_append_path
* Creates a path corresponding to a MergeAppend plan, returning the
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 82a1cf5..f2770fa 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -494,7 +494,7 @@ RegisterLWLockTranches(void)
if (LWLockTrancheArray == NULL)
{
- LWLockTranchesAllocated = 64;
+ LWLockTranchesAllocated = 128;
LWLockTrancheArray = (char **)
MemoryContextAllocZero(TopMemoryContext,
LWLockTranchesAllocated * sizeof(char *));
@@ -511,6 +511,7 @@ RegisterLWLockTranches(void)
LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA,
"parallel_query_dsa");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
+ LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
/* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 246fea8..0782aa3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -910,6 +910,15 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
+ {
+ {"enable_parallelappend", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of parallel append plans."),
+ NULL
+ },
+ &enable_parallelappend,
+ true,
+ NULL, NULL, NULL
+ },
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index df5d2f3..0a079b2 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -297,6 +297,7 @@
#enable_material = on
#enable_mergejoin = on
#enable_nestloop = on
+#enable_parallelappend = on
#enable_seqscan = on
#enable_sort = on
#enable_tidscan = on
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index 4e38a13..7d9e881 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -14,10 +14,14 @@
#ifndef NODEAPPEND_H
#define NODEAPPEND_H
+#include "access/parallel.h"
#include "nodes/execnodes.h"
extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags);
extern void ExecEndAppend(AppendState *node);
extern void ExecReScanAppend(AppendState *node);
+extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
+extern void ExecAppendInitializeWorker(AppendState *node, shm_toc *toc);
#endif /* NODEAPPEND_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6cf128a..2642502 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -21,6 +21,7 @@
#include "lib/pairingheap.h"
#include "nodes/params.h"
#include "nodes/plannodes.h"
+#include "storage/spin.h"
#include "utils/hsearch.h"
#include "utils/queryenvironment.h"
#include "utils/reltrigger.h"
@@ -995,12 +996,15 @@ typedef struct ModifyTableState
* whichplan which plan is being executed (0 .. n-1)
* ----------------
*/
+struct ParallelAppendDescData;
typedef struct AppendState
{
PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
+ struct ParallelAppendDescData *as_padesc; /* parallel coordination info */
+ Size pappend_len; /* size of parallel coordination info */
} AppendState;
/* ----------------
diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h
index 667d5e2..711db92 100644
--- a/src/include/nodes/pg_list.h
+++ b/src/include/nodes/pg_list.h
@@ -269,6 +269,9 @@ extern void list_free_deep(List *list);
extern List *list_copy(const List *list);
extern List *list_copy_tail(const List *list, int nskip);
+typedef int (*list_qsort_comparator) (const void *a, const void *b);
+extern List *list_qsort(const List *list, list_qsort_comparator cmp);
+
/*
* To ease migration to the new list API, a set of compatibility
* macros are provided that reduce the impact of the list API changes
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index a382331..1678497 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -248,6 +248,7 @@ typedef struct Append
/* RT indexes of non-leaf tables in a partition tree */
List *partitioned_rels;
List *appendplans;
+ int first_partial_plan;
} Append;
/* ----------------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index a39e59d..bdf1152 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1167,10 +1167,14 @@ typedef struct CustomPath
* AppendPath represents an Append plan, ie, successive execution of
* several member plans.
*
+ * For partial Append, 'subpaths' contains non-partial subpaths followed by
+ * partial subpaths.
+ *
* Note: it is possible for "subpaths" to contain only one, or even no,
* elements. These cases are optimized during create_append_plan.
* In particular, an AppendPath with no subpaths is a "dummy" path that
* is created to represent the case that a relation is provably empty.
+ *
*/
typedef struct AppendPath
{
@@ -1178,6 +1182,9 @@ typedef struct AppendPath
/* RT indexes of non-leaf tables in a partition tree */
List *partitioned_rels;
List *subpaths; /* list of component Paths */
+
+ /* Index of first partial path in subpaths */
+ int first_partial_path;
} AppendPath;
#define IS_DUMMY_PATH(p) \
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index 63feba0..8e66cf0 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -67,6 +67,7 @@ extern bool enable_material;
extern bool enable_mergejoin;
extern bool enable_hashjoin;
extern bool enable_gathermerge;
+extern bool enable_parallelappend;
extern int constraint_exclusion;
extern double clamp_row_est(double nrows);
@@ -105,6 +106,8 @@ extern void cost_sort(Path *path, PlannerInfo *root,
List *pathkeys, Cost input_cost, double tuples, int width,
Cost comparison_cost, int sort_mem,
double limit_tuples);
+extern void cost_append(Path *path, List *subpaths,
+ int num_nonpartial_subpaths);
extern void cost_merge_append(Path *path, PlannerInfo *root,
List *pathkeys, int n_streams,
Cost input_startup_cost, Cost input_total_cost,
diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h
index e372f88..d578a5d 100644
--- a/src/include/optimizer/pathnode.h
+++ b/src/include/optimizer/pathnode.h
@@ -14,6 +14,7 @@
#ifndef PATHNODE_H
#define PATHNODE_H
+#include "nodes/bitmapset.h"
#include "nodes/relation.h"
@@ -63,9 +64,13 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root,
List *bitmapquals);
extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
List *tidquals, Relids required_outer);
-extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths,
- Relids required_outer, int parallel_workers,
- List *partitioned_rels);
+extern int get_append_num_workers(List *partial_subpaths,
+ List *nonpartial_subpaths);
+extern AppendPath *create_append_path(RelOptInfo *rel,
+ List *subpaths, List *partial_subpaths,
+ Relids required_outer,
+ int parallel_workers, bool parallel_aware,
+ List *partitioned_rels);
extern MergeAppendPath *create_merge_append_path(PlannerInfo *root,
RelOptInfo *rel,
List *subpaths,
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 3d16132..35adf12 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -213,6 +213,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_PREDICATE_LOCK_MANAGER,
LWTRANCHE_PARALLEL_QUERY_DSA,
LWTRANCHE_TBM,
+ LWTRANCHE_PARALLEL_APPEND,
LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds;
diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out
index 1fa9650..7a5b3c7 100644
--- a/src/test/regress/expected/inherit.out
+++ b/src/test/regress/expected/inherit.out
@@ -1382,6 +1382,7 @@ select min(1-id) from matest0;
reset enable_indexscan;
set enable_seqscan = off; -- plan with fewest seqscans should be merge
+set enable_parallelappend = off; -- Don't let parallel-append interfere
explain (verbose, costs off) select * from matest0 order by 1-id;
QUERY PLAN
------------------------------------------------------------------
@@ -1448,6 +1449,7 @@ select min(1-id) from matest0;
(1 row)
reset enable_seqscan;
+reset enable_parallelappend;
drop table matest0 cascade;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table matest1
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 888da5a..daf00c2 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -11,15 +11,16 @@ set parallel_setup_cost=0;
set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
+-- test Parallel Append.
explain (costs off)
- select count(*) from a_star;
+ select round(avg(aa)), sum(aa) from a_star;
QUERY PLAN
-----------------------------------------------------
Finalize Aggregate
-> Gather
- Workers Planned: 1
+ Workers Planned: 4
-> Partial Aggregate
- -> Append
+ -> Parallel Append
-> Parallel Seq Scan on a_star
-> Parallel Seq Scan on b_star
-> Parallel Seq Scan on c_star
@@ -28,12 +29,40 @@ explain (costs off)
-> Parallel Seq Scan on f_star
(11 rows)
-select count(*) from a_star;
- count
--------
- 50
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum
+-------+-----
+ 14 | 355
+(1 row)
+
+-- Mix of partial and non-partial subplans.
+alter table c_star set (parallel_workers = 0);
+alter table d_star set (parallel_workers = 0);
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+ QUERY PLAN
+-----------------------------------------------------
+ Finalize Aggregate
+ -> Gather
+ Workers Planned: 4
+ -> Partial Aggregate
+ -> Parallel Append
+ -> Seq Scan on d_star
+ -> Seq Scan on c_star
+ -> Parallel Seq Scan on a_star
+ -> Parallel Seq Scan on b_star
+ -> Parallel Seq Scan on e_star
+ -> Parallel Seq Scan on f_star
+(11 rows)
+
+select round(avg(aa)), sum(aa) from a_star;
+ round | sum
+-------+-----
+ 14 | 355
(1 row)
+alter table c_star reset (parallel_workers);
+alter table d_star reset (parallel_workers);
-- test that parallel_restricted function doesn't run in worker
alter table tenk1 set (parallel_workers = 4);
explain (verbose, costs off)
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 568b783..97a9843 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -70,21 +70,22 @@ select count(*) >= 0 as ok from pg_prepared_xacts;
-- This is to record the prevailing planner enable_foo settings during
-- a regression test run.
select name, setting from pg_settings where name like 'enable%';
- name | setting
-----------------------+---------
- enable_bitmapscan | on
- enable_gathermerge | on
- enable_hashagg | on
- enable_hashjoin | on
- enable_indexonlyscan | on
- enable_indexscan | on
- enable_material | on
- enable_mergejoin | on
- enable_nestloop | on
- enable_seqscan | on
- enable_sort | on
- enable_tidscan | on
-(12 rows)
+ name | setting
+-----------------------+---------
+ enable_bitmapscan | on
+ enable_gathermerge | on
+ enable_hashagg | on
+ enable_hashjoin | on
+ enable_indexonlyscan | on
+ enable_indexscan | on
+ enable_material | on
+ enable_mergejoin | on
+ enable_nestloop | on
+ enable_parallelappend | on
+ enable_seqscan | on
+ enable_sort | on
+ enable_tidscan | on
+(13 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql
index c96580c..60ac387 100644
--- a/src/test/regress/sql/inherit.sql
+++ b/src/test/regress/sql/inherit.sql
@@ -491,11 +491,13 @@ select min(1-id) from matest0;
reset enable_indexscan;
set enable_seqscan = off; -- plan with fewest seqscans should be merge
+set enable_parallelappend = off; -- Don't let parallel-append interfere
explain (verbose, costs off) select * from matest0 order by 1-id;
select * from matest0 order by 1-id;
explain (verbose, costs off) select min(1-id) from matest0;
select min(1-id) from matest0;
reset enable_seqscan;
+reset enable_parallelappend;
drop table matest0 cascade;
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index cefb5a2..5aafc4d 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -15,9 +15,18 @@ set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4;
+-- test Parallel Append.
explain (costs off)
- select count(*) from a_star;
-select count(*) from a_star;
+ select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+-- Mix of partial and non-partial subplans.
+alter table c_star set (parallel_workers = 0);
+alter table d_star set (parallel_workers = 0);
+explain (costs off)
+ select round(avg(aa)), sum(aa) from a_star;
+select round(avg(aa)), sum(aa) from a_star;
+alter table c_star reset (parallel_workers);
+alter table d_star reset (parallel_workers);
-- test that parallel_restricted function doesn't run in worker
alter table tenk1 set (parallel_workers = 4);