From 7c5fa754fada96553930820b970da876fb1dc468 Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Fri, 25 Sep 2015 17:50:19 -0400 Subject: [PATCH 2/2] Test code. --- src/backend/executor/execMain.c | 47 ++++++++++++++++++++++++++++++++++++- src/backend/executor/execParallel.c | 4 +++- src/backend/utils/misc/guc.c | 11 +++++++++ src/include/utils/guc.h | 2 ++ 4 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 85ff46b..4863afd 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -45,6 +45,8 @@ #include "commands/matview.h" #include "commands/trigger.h" #include "executor/execdebug.h" +#include "executor/execParallel.h" +#include "executor/tqueue.h" #include "foreign/fdwapi.h" #include "mb/pg_wchar.h" #include "miscadmin.h" @@ -339,13 +341,56 @@ standard_ExecutorRun(QueryDesc *queryDesc, * run plan */ if (!ScanDirectionIsNoMovement(direction)) - ExecutePlan(estate, + { + if (force_parallel_worker && !IsParallelWorker()) + { + ParallelExecutorInfo *pei; + TupleQueueFunnel *funnel; + TupleDesc tupType; + TupleTableSlot *slot; + + EnterParallelMode(); + + /* Utterly disregarding sanity checks, let's try this out... */ + pei = ExecInitParallelPlan(queryDesc->planstate, estate, 1); + LaunchParallelWorkers(pei->pcxt); + + /* Set up to send results wherever they're supposed to go. */ + funnel = CreateTupleQueueFunnel(); + RegisterTupleQueueOnFunnel(funnel, pei->tqueue[0]); + tupType = ExecGetResultType(queryDesc->planstate); + slot = MakeSingleTupleTableSlot(tupType); + + /* Read tuples from the worker and send them to the receiver. */ + for (;;) + { + HeapTuple tup; + bool done; + + tup = TupleQueueFunnelNext(funnel, false, &done); + if (done) + break; + Assert(tup != NULL); + ExecStoreTuple(tup, slot, InvalidBuffer, true); + (*dest->receiveSlot) (slot, dest); + } + + /* Clean up. */ + ExecParallelFinish(pei); + DestroyParallelContext(pei->pcxt); + ExitParallelMode(); + } + else + { + ExecutePlan(estate, queryDesc->planstate, operation, sendTuples, count, direction, dest); + } + } /* * shutdown tuple receiver, if we started it diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index a409a9a..9c8bf4b 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -568,7 +568,6 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecutorStart(queryDesc, 0); ExecutorRun(queryDesc, ForwardScanDirection, 0L); ExecutorFinish(queryDesc); - ExecutorEnd(queryDesc); /* Report buffer usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE); @@ -579,6 +578,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecParallelReportInstrumentation(queryDesc->planstate, instrumentation); + /* Must do this after capturing instrumentation. */ + ExecutorEnd(queryDesc); + /* Cleanup. */ FreeQueryDesc(queryDesc); (*receiver->rDestroy) (receiver); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 17053af..11c54dd 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -112,6 +112,8 @@ extern char *temp_tablespaces; extern bool ignore_checksum_failure; extern bool synchronize_seqscans; +bool force_parallel_worker; + #ifdef TRACE_SYNCSCAN extern bool trace_syncscan; #endif @@ -755,6 +757,15 @@ static const unit_conversion time_unit_conversion_table[] = static struct config_bool ConfigureNamesBool[] = { { + {"force_parallel_worker", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Force use of a parallel worker in the executor."), + NULL + }, + &force_parallel_worker, + false, + NULL, NULL, NULL + }, + { {"enable_seqscan", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of sequential-scan plans."), NULL diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index dc167f9..702e067 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -433,4 +433,6 @@ extern void assign_search_path(const char *newval, void *extra); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +extern bool force_parallel_worker; + #endif /* GUC_H */ -- 2.3.8 (Apple Git-58)