From 2b1069b3ad1febc7ec76615607087d1484a8dcba Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Thu, 15 Oct 2015 19:19:40 -0400 Subject: [PATCH 1/3] Make it possible to relaunch workers for an existing parallel context. --- src/backend/access/transam/README.parallel | 5 ++++ src/backend/access/transam/parallel.c | 46 ++++++++++++++++++++++++++++++ src/include/access/parallel.h | 1 + 3 files changed, 52 insertions(+) diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel index 1005186..dfcbafa 100644 --- a/src/backend/access/transam/README.parallel +++ b/src/backend/access/transam/README.parallel @@ -221,3 +221,8 @@ pattern looks like this: DestroyParallelContext(pcxt); ExitParallelMode(); + +If desired, after WaitForParallelWorkersToFinish() has been called, another +call to LaunchParallelWorkers() can be made using the same parallel context. +Calls to these two functions can be alternated any number of times before +destroying the parallel context. diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 29d6ed5..8363c0e 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -392,6 +392,49 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); + /* + * This function can be called for a parallel context for which it has + * already been called previously, but only if all of the old workers + * have already exited. When this case arises, we need to do some extra + * reinitialization. + */ + if (pcxt->nworkers_launched > 0) + { + FixedParallelState *fps; + char *error_queue_space; + + /* Clean out old worker handles. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + elog(ERROR, "previously launched worker still alive"); + if (pcxt->worker[i].bgwhandle != NULL) + { + pfree(pcxt->worker[i].bgwhandle); + pcxt->worker[i].bgwhandle = NULL; + } + } + + /* Reset a few bits of fixed parallel state to a clean state. */ + fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); + fps->workers_attached = 0; + fps->last_xlog_end = 0; + + /* Recreate error queues. */ + error_queue_space = + shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE); + for (i = 0; i < pcxt->nworkers; ++i) + { + char *start; + shm_mq *mq; + + start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; + mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + } + /* Configure a worker. */ snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", MyProcPid); @@ -416,8 +459,11 @@ LaunchParallelWorkers(ParallelContext *pcxt) if (!any_registrations_failed && RegisterDynamicBackgroundWorker(&worker, &pcxt->worker[i].bgwhandle)) + { shm_mq_set_handle(pcxt->worker[i].error_mqh, pcxt->worker[i].bgwhandle); + pcxt->nworkers_launched++; + } else { /* diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index b029c1e..57635c8 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -35,6 +35,7 @@ typedef struct ParallelContext dlist_node node; SubTransactionId subid; int nworkers; + int nworkers_launched; parallel_worker_main_type entrypoint; char *library_name; char *function_name; -- 2.3.8 (Apple Git-58)