From 47c6e161de4fc9d2d6eff45f427ebf49b4c9d11c Mon Sep 17 00:00:00 2001 From: Yura Sokolov Date: Mon, 20 Dec 2021 11:48:10 +0300 Subject: [PATCH v1] Quick fix to duplicate result rows after Append path removal. It could happen Append path is created with "parallel_aware" flag, but its single child is not. Append path parent (Gather or Gather Merge) thinks its child is parallel_aware, but after Append path removal Gather's child become not parallel_aware. Then when Gather/Gather Merge decides to run child in several workers or worker + leader participation, it gathers duplicate result rows from several child path invocations. With this fix Append path copies its single child parallel_aware / cost / workers values. Copied `num_workers == 0` triggers assert `num_workers > 0` in cost_gather_merge function. But changing this assert to `num_workers >= 0` doesn't lead to any runtime and/or logical error. Fixes bug 17335 https://postgr.es/m/flat/17335-4dc92e1aea3a78af%40postgresql.org --- src/backend/optimizer/path/costsize.c | 2 +- src/backend/optimizer/util/pathnode.c | 3 + .../expected/gather_removed_append.out | 131 ++++++++++++++++++ src/test/regress/parallel_schedule | 1 + .../regress/sql/gather_removed_append.sql | 82 +++++++++++ 5 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 src/test/regress/expected/gather_removed_append.out create mode 100644 src/test/regress/sql/gather_removed_append.sql diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 1e4d404f024..9949c3ab555 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -440,7 +440,7 @@ cost_gather_merge(GatherMergePath *path, PlannerInfo *root, * be overgenerous since the leader will do less work than other workers * in typical cases, but we'll go with it for now. */ - Assert(path->num_workers > 0); + Assert(path->num_workers >= 0); N = (double) path->num_workers + 1; logN = LOG2(N); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index af5e8df26b4..2ff4678937a 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1340,6 +1340,9 @@ create_append_path(PlannerInfo *root, pathnode->path.startup_cost = child->startup_cost; pathnode->path.total_cost = child->total_cost; pathnode->path.pathkeys = child->pathkeys; + pathnode->path.parallel_aware = child->parallel_aware; + pathnode->path.parallel_safe = child->parallel_safe; + pathnode->path.parallel_workers = child->parallel_workers; } else cost_append(pathnode); diff --git a/src/test/regress/expected/gather_removed_append.out b/src/test/regress/expected/gather_removed_append.out new file mode 100644 index 00000000000..f6e861ce59d --- /dev/null +++ b/src/test/regress/expected/gather_removed_append.out @@ -0,0 +1,131 @@ +-- Test correctness of parallel query execution after removal +-- of Append path due to single non-trivial child. +DROP TABLE IF EXISTS gather_append_1, gather_append_2; +NOTICE: table "gather_append_1" does not exist, skipping +NOTICE: table "gather_append_2" does not exist, skipping +CREATE TABLE gather_append_1 ( + fk int, + f bool +); +INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i; +CREATE INDEX gather_append_1_ix on gather_append_1 (f); +CREATE TABLE gather_append_2 ( + fk int, + val serial +); +INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i; +ANALYZE gather_append_1, gather_append_2; +SET max_parallel_workers_per_gather = 0; +-- Find correct rows count +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + count +------- + 200 +(1 row) + +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0.1; +SET min_parallel_table_scan_size = 0; +SET max_parallel_workers_per_gather = 2; +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + count +------- + 200 +(1 row) + +EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false) +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + QUERY PLAN +--------------------------------------------------------------------------------------------------------- + Finalize Aggregate (actual rows=1 loops=1) + -> Gather (actual rows=1 loops=1) + Workers Planned: 1 + Workers Launched: 1 + Single Copy: true + -> Partial Aggregate (actual rows=1 loops=1) + -> Parallel Hash Left Join (actual rows=200 loops=1) + Hash Cond: (gather_append_1.fk = gather_append_2.fk) + -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1) + Index Cond: (f = true) + -> Parallel Hash (actual rows=10000 loops=1) + Buckets: 16384 Batches: 1 Memory Usage: 544kB + -> Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1) +(13 rows) + +-- Result rows in root node should be equal to non-parallel count +EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false) +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk) +ORDER BY val; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + Gather Merge (actual rows=200 loops=1) + Workers Planned: 0 + Workers Launched: 0 + -> Sort (actual rows=200 loops=1) + Sort Key: gather_append_2.val + Sort Method: quicksort Memory: 25kB + -> Parallel Hash Left Join (actual rows=200 loops=1) + Hash Cond: (gather_append_1.fk = gather_append_2.fk) + -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1) + Index Cond: (f = true) + -> Parallel Hash (actual rows=10000 loops=1) + Buckets: 16384 Batches: 1 Memory Usage: 519kB + -> Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1) +(13 rows) + +-- Result rows in root node should be equal to non-parallel count +EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false) +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk) +ORDER BY val; + QUERY PLAN +--------------------------------------------------------------------------------------------------- + Gather Merge (actual rows=200 loops=1) + Workers Planned: 0 + Workers Launched: 0 + -> Sort (actual rows=200 loops=1) + Sort Key: gather_append_2.val + Sort Method: quicksort Memory: 25kB + -> Parallel Hash Left Join (actual rows=200 loops=1) + Hash Cond: (gather_append_1.fk = gather_append_2.fk) + -> Index Scan using gather_append_1_ix on gather_append_1 (actual rows=40 loops=1) + Index Cond: (f = true) + -> Parallel Hash (actual rows=10000 loops=1) + Buckets: 16384 Batches: 1 Memory Usage: 519kB + -> Parallel Seq Scan on gather_append_2 (actual rows=10000 loops=1) +(13 rows) + diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 5b0c73d7e37..84f2f81255d 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -100,6 +100,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8 test: select_parallel test: write_parallel test: vacuum_parallel +test: gather_removed_append # no relation related tests can be put in this group test: publication subscription diff --git a/src/test/regress/sql/gather_removed_append.sql b/src/test/regress/sql/gather_removed_append.sql new file mode 100644 index 00000000000..df1b796a4f6 --- /dev/null +++ b/src/test/regress/sql/gather_removed_append.sql @@ -0,0 +1,82 @@ +-- Test correctness of parallel query execution after removal +-- of Append path due to single non-trivial child. + +DROP TABLE IF EXISTS gather_append_1, gather_append_2; + +CREATE TABLE gather_append_1 ( + fk int, + f bool +); + +INSERT INTO gather_append_1 (fk, f) SELECT i, i%50=0 from generate_series(1, 2000) as i; + +CREATE INDEX gather_append_1_ix on gather_append_1 (f); + +CREATE TABLE gather_append_2 ( + fk int, + val serial +); + +INSERT INTO gather_append_2 (fk) SELECT fk from gather_append_1, generate_series(1, 5) as i; + +ANALYZE gather_append_1, gather_append_2; + +SET max_parallel_workers_per_gather = 0; + +-- Find correct rows count +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + +SET parallel_setup_cost = 0; +SET parallel_tuple_cost = 0.1; +SET min_parallel_table_scan_size = 0; +SET max_parallel_workers_per_gather = 2; + +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + +EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false) +SELECT count(1) +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk); + +-- Result rows in root node should be equal to non-parallel count +EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false) +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk) +ORDER BY val; + +-- Result rows in root node should be equal to non-parallel count +EXPLAIN (ANALYZE true, COSTS false, TIMING false, SUMMARY false, VERBOSE false, BUFFERS false) +SELECT val +FROM ( + SELECT fk FROM gather_append_1 WHERE f + UNION ALL + SELECT fk FROM gather_append_1 WHERE false +) as t +LEFT OUTER JOIN gather_append_2 +USING (fk) +ORDER BY val; -- 2.34.1