From 0d04f79cb51d6be0ced9c6561cfca5bfe18c4bdd Mon Sep 17 00:00:00 2001 From: Jaime Casanova Date: Wed, 15 Dec 2021 12:14:44 -0500 Subject: [PATCH] Add --jobs-per-disk option to allow multiple processes per tablespace This option is independent of the --jobs one. It's will fork new processes to copy the different segments of a relfilenode in parallel. --- src/bin/pg_upgrade/option.c | 8 ++- src/bin/pg_upgrade/parallel.c | 93 ++++++++++++++++++++++++++++++++ src/bin/pg_upgrade/pg_upgrade.h | 4 ++ src/bin/pg_upgrade/relfilenode.c | 59 +++++++++++--------- 4 files changed, 139 insertions(+), 25 deletions(-) diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c index 66fe16964e..46b1913a42 100644 --- a/src/bin/pg_upgrade/option.c +++ b/src/bin/pg_upgrade/option.c @@ -54,6 +54,7 @@ parseCommandLine(int argc, char *argv[]) {"link", no_argument, NULL, 'k'}, {"retain", no_argument, NULL, 'r'}, {"jobs", required_argument, NULL, 'j'}, + {"jobs-per-disks", required_argument, NULL, 'J'}, {"socketdir", required_argument, NULL, 's'}, {"verbose", no_argument, NULL, 'v'}, {"clone", no_argument, NULL, 1}, @@ -103,7 +104,7 @@ parseCommandLine(int argc, char *argv[]) if (os_user_effective_id == 0) pg_fatal("%s: cannot be run as root\n", os_info.progname); - while ((option = getopt_long(argc, argv, "d:D:b:B:cj:kNo:O:p:P:rs:U:v", + while ((option = getopt_long(argc, argv, "d:D:b:B:cj:J:kNo:O:p:P:rs:U:v", long_options, &optindex)) != -1) { switch (option) @@ -132,6 +133,10 @@ parseCommandLine(int argc, char *argv[]) user_opts.jobs = atoi(optarg); break; + case 'J': + user_opts.jobs_per_disk = atoi(optarg); + break; + case 'k': user_opts.transfer_mode = TRANSFER_MODE_LINK; break; @@ -291,6 +296,7 @@ usage(void) printf(_(" -d, --old-datadir=DATADIR old cluster data directory\n")); printf(_(" -D, --new-datadir=DATADIR new cluster data directory\n")); printf(_(" -j, --jobs=NUM number of simultaneous processes or threads to use\n")); + printf(_(" -J, --jobs_per_disk=NUM number of simultaneous processes or threads to use per tablespace\n")); printf(_(" -k, --link link instead of copying files to new cluster\n")); printf(_(" -N, --no-sync do not wait for changes to be written safely to disk\n")); printf(_(" -o, --old-options=OPTIONS old cluster options to pass to the server\n")); diff --git a/src/bin/pg_upgrade/parallel.c b/src/bin/pg_upgrade/parallel.c index ee7364da3b..82f698a9ab 100644 --- a/src/bin/pg_upgrade/parallel.c +++ b/src/bin/pg_upgrade/parallel.c @@ -17,6 +17,9 @@ #include "pg_upgrade.h" static int parallel_jobs; +static int current_jobs = 0; + +static bool reap_subchild(bool wait_for_child); #ifdef WIN32 /* @@ -277,6 +280,60 @@ win32_transfer_all_new_dbs(transfer_thread_arg *args) #endif + +/* + * parallel_process_relfile_segment() + * + * Copy or link file from old cluster to new one. If vm_must_add_frozenbit + * is true, visibility map forks are converted and rewritten, even in link + * mode. + */ +void +parallel_process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file) +{ +#ifndef WIN32 + pid_t child; +#else + HANDLE child; + transfer_thread_arg *new_arg; +#endif + if (user_opts.jobs <= 1 || user_opts.jobs_per_disk <= 1) + process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + else + { + /* parallel */ + + /* harvest any dead children */ + while (reap_subchild(false) == true) + ; + + /* must we wait for a dead child? use a maximum of 3 childs per tablespace */ + if (current_jobs >= user_opts.jobs_per_disk) + reap_subchild(true); + + /* set this before we start the job */ + current_jobs++; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + { + process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + /* use _exit to skip atexit() functions */ + _exit(0); + } + else if (child < 0) + /* fork failed */ + pg_fatal("could not create worker process: %s\n", strerror(errno)); +#endif + } +} + + + /* * collect status from a completed worker child */ @@ -345,3 +402,39 @@ reap_child(bool wait_for_child) return true; } + + + + +/* + * collect status from a completed worker subchild + */ +static bool +reap_subchild(bool wait_for_child) +{ +#ifndef WIN32 + int work_status; + pid_t child; +#else + int thread_num; + DWORD res; +#endif + + if (user_opts.jobs <= 1 || current_jobs == 0) + return false; + +#ifndef WIN32 + child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG); + if (child == (pid_t) -1) + pg_fatal("waitpid() failed: %s\n", strerror(errno)); + if (child == 0) + return false; /* no children, or no dead children */ + if (work_status != 0) + pg_fatal("child process exited abnormally: status %d\n", work_status); +#endif + + /* do this after job has been removed */ + current_jobs--; + + return true; +} diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 22169f1002..adcb24ffea 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -282,6 +282,7 @@ typedef struct bool do_sync; /* flush changes to disk */ transferMode transfer_mode; /* copy files or link them? */ int jobs; /* number of processes/threads to use */ + int jobs_per_disk; /* number of processes/threads to use */ char *socketdir; /* directory to use for Unix sockets */ } UserOpts; @@ -450,4 +451,7 @@ void parallel_exec_prog(const char *log_file, const char *opt_log_file, void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, char *old_pgdata, char *new_pgdata, char *old_tablespace); + +void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); +void parallel_process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); bool reap_child(bool wait_for_child); diff --git a/src/bin/pg_upgrade/relfilenode.c b/src/bin/pg_upgrade/relfilenode.c index 5dbefbceaf..8a7c49efaa 100644 --- a/src/bin/pg_upgrade/relfilenode.c +++ b/src/bin/pg_upgrade/relfilenode.c @@ -17,6 +17,7 @@ static void transfer_single_new_db(FileNameMap *maps, int size, char *old_tablespace); static void transfer_relfile(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit); +void process_relfile_segment(FileNameMap *map, const char *suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file); /* @@ -232,30 +233,40 @@ transfer_relfile(FileNameMap *map, const char *type_suffix, bool vm_must_add_fro /* Copying files might take some time, so give feedback. */ pg_log(PG_STATUS, "%s", old_file); - if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0) + parallel_process_relfile_segment(map, type_suffix, vm_must_add_frozenbit, old_file, new_file); + } +} + + + +void +process_relfile_segment(FileNameMap *map, const char *type_suffix, bool vm_must_add_frozenbit, const char *old_file, const char *new_file) +{ + + if (vm_must_add_frozenbit && strcmp(type_suffix, "_vm") == 0) + { + /* Need to rewrite visibility map format */ + pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n", + old_file, new_file); + rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname); + } + else + switch (user_opts.transfer_mode) { - /* Need to rewrite visibility map format */ - pg_log(PG_VERBOSE, "rewriting \"%s\" to \"%s\"\n", - old_file, new_file); - rewriteVisibilityMap(old_file, new_file, map->nspname, map->relname); + case TRANSFER_MODE_CLONE: + pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n", + old_file, new_file); + cloneFile(old_file, new_file, map->nspname, map->relname); + break; + case TRANSFER_MODE_COPY: + pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", + old_file, new_file); + copyFile(old_file, new_file, map->nspname, map->relname); + break; + case TRANSFER_MODE_LINK: + pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", + old_file, new_file); + linkFile(old_file, new_file, map->nspname, map->relname); + break; } - else - switch (user_opts.transfer_mode) - { - case TRANSFER_MODE_CLONE: - pg_log(PG_VERBOSE, "cloning \"%s\" to \"%s\"\n", - old_file, new_file); - cloneFile(old_file, new_file, map->nspname, map->relname); - break; - case TRANSFER_MODE_COPY: - pg_log(PG_VERBOSE, "copying \"%s\" to \"%s\"\n", - old_file, new_file); - copyFile(old_file, new_file, map->nspname, map->relname); - break; - case TRANSFER_MODE_LINK: - pg_log(PG_VERBOSE, "linking \"%s\" to \"%s\"\n", - old_file, new_file); - linkFile(old_file, new_file, map->nspname, map->relname); - } - } } -- 2.20.1