From 657e797c3081c6628f8b2abe077109c7fe6067db Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Wed, 25 Apr 2018 16:39:09 +0300 Subject: [PATCH 3/3] postgres_fdw support for global snapshots v3 --- contrib/postgres_fdw/Makefile | 9 + contrib/postgres_fdw/connection.c | 292 ++++++++++++++++-- contrib/postgres_fdw/postgres_fdw.c | 12 + contrib/postgres_fdw/postgres_fdw.h | 2 + .../postgres_fdw/t/001_bank_coordinator.pl | 264 ++++++++++++++++ .../postgres_fdw/t/002_bank_participant.pl | 240 ++++++++++++++ src/test/perl/PostgresNode.pm | 35 +++ 7 files changed, 827 insertions(+), 27 deletions(-) create mode 100644 contrib/postgres_fdw/t/001_bank_coordinator.pl create mode 100644 contrib/postgres_fdw/t/002_bank_participant.pl diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index 85394b4f1f..02ae067cd0 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -23,3 +23,12 @@ top_builddir = ../.. include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif + +# Global makefile will do temp-install for 'check'. Since REGRESS is defined, +# PGXS (included from contrib-global.mk or directly) will care to add +# postgres_fdw to it as EXTRA_INSTALL and build pg_regress. It will also +# actually run pg_regress, so the only thing left is tap tests. +check: tapcheck + +tapcheck: temp-install + $(prove_check) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index a6509932dc..f6c43ad518 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -14,9 +14,11 @@ #include "postgres_fdw.h" +#include "access/global_snapshot.h" #include "access/htup_details.h" #include "catalog/pg_user_mapping.h" #include "access/xact.h" +#include "access/xlog.h" /* GetSystemIdentifier() */ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -24,6 +26,8 @@ #include "utils/hsearch.h" #include "utils/inval.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" +#include "utils/snapshot.h" #include "utils/syscache.h" @@ -65,6 +69,21 @@ typedef struct ConnCacheEntry */ static HTAB *ConnectionHash = NULL; +/* + * FdwTransactionState + * + * Holds number of open remote transactions and shared state + * needed for all connection entries. + */ +typedef struct FdwTransactionState +{ + char *gid; + int nparticipants; + GlobalCSN global_csn; + bool two_phase_commit; +} FdwTransactionState; +static FdwTransactionState *fdwTransState; + /* for assigning cursor numbers and prepared statement numbers */ static unsigned int cursor_number = 0; static unsigned int prep_stmt_number = 0; @@ -72,6 +91,9 @@ static unsigned int prep_stmt_number = 0; /* tracks whether any work is needed in callback functions */ static bool xact_got_connection = false; +/* counter of prepared tx made by this backend */ +static int two_phase_xact_count = 0; + /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); @@ -80,6 +102,7 @@ static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); +static void deallocate_prepared_stmts(ConnCacheEntry *entry); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, @@ -136,6 +159,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt) pgfdw_inval_callback, (Datum) 0); } + /* allocate FdwTransactionState */ + if (fdwTransState == NULL) + { + MemoryContext oldcxt; + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + fdwTransState = palloc0(sizeof(FdwTransactionState)); + MemoryContextSwitchTo(oldcxt); + } + /* Set flag that we did GetConnection during the current transaction */ xact_got_connection = true; @@ -388,7 +420,8 @@ configure_remote_session(PGconn *conn) } /* - * Convenience subroutine to issue a non-data-returning SQL command to remote + * Convenience subroutine to issue a non-data-returning SQL command or + * statement to remote node. */ static void do_sql_command(PGconn *conn, const char *sql) @@ -398,7 +431,8 @@ do_sql_command(PGconn *conn, const char *sql) if (!PQsendQuery(conn, sql)) pgfdw_report_error(ERROR, NULL, conn, false, sql); res = pgfdw_get_result(conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, true, sql); PQclear(res); } @@ -426,6 +460,10 @@ begin_remote_xact(ConnCacheEntry *entry) elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); + if (UseGlobalSnapshots && (!IsolationUsesXactSnapshot() || + IsolationIsSerializable())) + elog(ERROR, "Global snapshots support only REPEATABLE READ"); + if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else @@ -434,6 +472,23 @@ begin_remote_xact(ConnCacheEntry *entry) do_sql_command(entry->conn, sql); entry->xact_depth = 1; entry->changing_xact_state = false; + + if (UseGlobalSnapshots) + { + char import_sql[128]; + + /* Export our snapshot */ + if (fdwTransState->global_csn == 0) + fdwTransState->global_csn = ExportGlobalSnapshot(); + + snprintf(import_sql, sizeof(import_sql), + "SELECT pg_global_snapshot_import("UINT64_FORMAT")", + fdwTransState->global_csn); + + do_sql_command(entry->conn, import_sql); + } + + fdwTransState->nparticipants += 1; } /* @@ -644,6 +699,94 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, PQclear(res); } +/* Callback typedef for BroadcastStmt */ +typedef bool (*BroadcastCmdResHandler) (PGresult *result, void *arg); + +/* Broadcast sql in parallel to all ConnectionHash entries */ +static bool +BroadcastStmt(char const * sql, unsigned expectedStatus, + BroadcastCmdResHandler handler, void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + bool allOk = true; + + /* Broadcast sql */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + pgfdw_reject_incomplete_xact_state_change(entry); + + if (entry->xact_depth > 0 && entry->conn != NULL) + { + if (!PQsendQuery(entry->conn, sql)) + { + PGresult *res = PQgetResult(entry->conn); + + elog(WARNING, "Failed to send command %s", sql); + pgfdw_report_error(WARNING, res, entry->conn, true, sql); + PQclear(res); + } + } + } + + /* Collect responses */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->xact_depth > 0 && entry->conn != NULL) + { + PGresult *result = PQgetResult(entry->conn); + + if (PQresultStatus(result) != expectedStatus || + (handler && !handler(result, arg))) + { + elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus); + pgfdw_report_error(ERROR, result, entry->conn, true, sql); + allOk = false; + } + PQclear(result); + PQgetResult(entry->conn); /* consume NULL result */ + } + } + + return allOk; +} + +/* Wrapper for broadcasting commands */ +static bool +BroadcastCmd(char const *sql) +{ + return BroadcastStmt(sql, PGRES_COMMAND_OK, NULL, NULL); +} + +/* Wrapper for broadcasting statements */ +static bool +BroadcastFunc(char const *sql) +{ + return BroadcastStmt(sql, PGRES_TUPLES_OK, NULL, NULL); +} + +/* Callback for selecting maximal csn */ +static bool +MaxCsnCB(PGresult *result, void *arg) +{ + char *resp; + GlobalCSN *max_csn = (GlobalCSN *) arg; + GlobalCSN csn = 0; + + resp = PQgetvalue(result, 0, 0); + + if (resp == NULL || (*resp) == '\0' || + sscanf(resp, UINT64_FORMAT, &csn) != 1) + return false; + + if (*max_csn < csn) + *max_csn = csn; + + return true; +} + /* * pgfdw_xact_callback --- cleanup at main-transaction end. */ @@ -657,6 +800,86 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (!xact_got_connection) return; + /* Handle possible two-phase commit */ + if (event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT) + { + bool include_local_tx = false; + + /* Should we take into account this node? */ + if (TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + { + include_local_tx = true; + fdwTransState->nparticipants += 1; + } + + /* Switch to 2PC mode there were more than one participant */ + if (UseGlobalSnapshots && fdwTransState->nparticipants > 1) + fdwTransState->two_phase_commit = true; + + if (fdwTransState->two_phase_commit) + { + GlobalCSN max_csn = InProgressGlobalCSN, + my_csn = InProgressGlobalCSN; + bool res; + char *sql; + + fdwTransState->gid = psprintf("pgfdw:%lld:%llu:%d:%u:%d:%d", + (long long) GetCurrentTimestamp(), + (long long) GetSystemIdentifier(), + MyProcPid, + GetCurrentTransactionIdIfAny(), + ++two_phase_xact_count, + fdwTransState->nparticipants); + + /* Broadcast PREPARE */ + sql = psprintf("PREPARE TRANSACTION '%s'", fdwTransState->gid); + res = BroadcastCmd(sql); + if (!res) + goto error; + + /* Broadcast pg_global_snapshot_prepare() */ + if (include_local_tx) + my_csn = GlobalSnapshotPrepareCurrent(); + + sql = psprintf("SELECT pg_global_snapshot_prepare('%s')", + fdwTransState->gid); + res = BroadcastStmt(sql, PGRES_TUPLES_OK, MaxCsnCB, &max_csn); + if (!res) + goto error; + + /* select maximal global csn */ + if (include_local_tx && my_csn > max_csn) + max_csn = my_csn; + + /* Broadcast pg_global_snapshot_assign() */ + if (include_local_tx) + GlobalSnapshotAssignCsnCurrent(max_csn); + sql = psprintf("SELECT pg_global_snapshot_assign('%s',"UINT64_FORMAT")", + fdwTransState->gid, max_csn); + res = BroadcastFunc(sql); + +error: + if (!res) + { + sql = psprintf("ABORT PREPARED '%s'", fdwTransState->gid); + BroadcastCmd(sql); + elog(ERROR, "Failed to PREPARE transaction on remote node"); + } + + /* + * Do not fall down. Consequent COMMIT event will clean thing up. + */ + return; + } + } + + /* COMMIT open transaction of we were doing 2PC */ + if (fdwTransState->two_phase_commit && + (event == XACT_EVENT_PARALLEL_COMMIT || event == XACT_EVENT_COMMIT)) + { + BroadcastCmd(psprintf("COMMIT PREPARED '%s'", fdwTransState->gid)); + } + /* * Scan all connection cache entries to find open remote transactions, and * close them. @@ -664,8 +887,6 @@ pgfdw_xact_callback(XactEvent event, void *arg) hash_seq_init(&scan, ConnectionHash); while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) { - PGresult *res; - /* Ignore cache entry if no open connection right now */ if (entry->conn == NULL) continue; @@ -682,6 +903,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) { case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: + Assert(!fdwTransState->two_phase_commit); /* * If abort cleanup previously failed for this connection, @@ -694,28 +916,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) do_sql_command(entry->conn, "COMMIT TRANSACTION"); entry->changing_xact_state = false; - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; + deallocate_prepared_stmts(entry); break; case XACT_EVENT_PRE_PREPARE: @@ -730,10 +931,15 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified remote tables"))); + errmsg("cannot prepare a transaction that modified remote tables"))); break; case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_COMMIT: + if (fdwTransState->two_phase_commit) + deallocate_prepared_stmts(entry); + else /* Pre-commit should have closed the open transaction */ + elog(ERROR, "missed cleaning up connection during pre-commit"); + break; case XACT_EVENT_PREPARE: /* Pre-commit should have closed the open transaction */ elog(ERROR, "missed cleaning up connection during pre-commit"); @@ -829,6 +1035,38 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* Also reset cursor numbering for next transaction */ cursor_number = 0; + + /* Reset fdwTransState */ + memset(fdwTransState, '\0', sizeof(FdwTransactionState)); +} + +/* + * If there were any errors in subtransactions, and we + * made prepared statements, do a DEALLOCATE ALL to make + * sure we get rid of all prepared statements. This is + * annoying and not terribly bulletproof, but it's + * probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this + * constrains how old a server postgres_fdw can + * communicate with. We intentionally ignore errors in + * the DEALLOCATE, so that we can hobble along to some + * extent with older servers (leaking prepared statements + * as we go; but we don't really support update operations + * pre-8.3 anyway). + */ +static void +deallocate_prepared_stmts(ConnCacheEntry *entry) +{ + PGresult *res; + + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; } /* diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index d22c974f0f..9e160efdef 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -269,6 +269,9 @@ typedef struct List *already_used; /* expressions already dealt with */ } ec_member_foreign_arg; +bool UseGlobalSnapshots; +void _PG_init(void); + /* * SQL functions */ @@ -5772,3 +5775,12 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel) /* We didn't find any suitable equivalence class expression */ return NULL; } + +void +_PG_init(void) +{ + DefineCustomBoolVariable("postgres_fdw.use_global_snapshots", + "Use global snapshots for FDW transactions", NULL, + &UseGlobalSnapshots, false, PGC_USERSET, 0, NULL, + NULL, NULL); +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 70b538e2f9..8cf5b12798 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -186,4 +186,6 @@ extern const char *get_jointype_name(JoinType jointype); extern bool is_builtin(Oid objectId); extern bool is_shippable(Oid objectId, Oid classId, PgFdwRelationInfo *fpinfo); +extern bool UseGlobalSnapshots; + #endif /* POSTGRES_FDW_H */ diff --git a/contrib/postgres_fdw/t/001_bank_coordinator.pl b/contrib/postgres_fdw/t/001_bank_coordinator.pl new file mode 100644 index 0000000000..1e31f33349 --- /dev/null +++ b/contrib/postgres_fdw/t/001_bank_coordinator.pl @@ -0,0 +1,264 @@ +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +my $master = get_new_node("master"); +$master->init; +$master->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + log_checkpoints = true + postgres_fdw.use_global_snapshots = on + track_global_snapshots = on + default_transaction_isolation = 'REPEATABLE READ' +)); +$master->start; + +my $shard1 = get_new_node("shard1"); +$shard1->init; +$shard1->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + global_snapshot_defer_time = 15 + track_global_snapshots = on +)); +$shard1->start; + +my $shard2 = get_new_node("shard2"); +$shard2->init; +$shard2->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + global_snapshot_defer_time = 15 + track_global_snapshots = on +)); +$shard2->start; + +############################################################################### +# Prepare nodes +############################################################################### + +$master->safe_psql('postgres', qq[ + CREATE EXTENSION postgres_fdw; + CREATE TABLE accounts(id integer primary key, amount integer); + CREATE TABLE global_transactions(tx_time timestamp); +]); + +foreach my $node ($shard1, $shard2) +{ + my $port = $node->port; + my $host = $node->host; + + $node->safe_psql('postgres', + "CREATE TABLE accounts(id integer primary key, amount integer)"); + + $master->safe_psql('postgres', qq[ + CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw options(dbname 'postgres', host '$host', port '$port'); + CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) server shard_$port options(table_name 'accounts'); + CREATE USER MAPPING for CURRENT_USER SERVER shard_$port; + ]) +} + +$shard1->safe_psql('postgres', qq[ + insert into accounts select 2*id-1, 0 from generate_series(1, 10010) as id; + CREATE TABLE local_transactions(tx_time timestamp); +]); + +$shard2->safe_psql('postgres', qq[ + insert into accounts select 2*id, 0 from generate_series(1, 10010) as id; + CREATE TABLE local_transactions(tx_time timestamp); +]); + +diag("master: @{[$master->connstr('postgres')]}"); +diag("shard1: @{[$shard1->connstr('postgres')]}"); +diag("shard2: @{[$shard2->connstr('postgres')]}"); + +############################################################################### +# pgbench scripts +############################################################################### + +my $bank = File::Temp->new(); +append_to_file($bank, q{ + \set id random(1, 20000) + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *) + INSERT into global_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1); + COMMIT; +}); + +my $bank1 = File::Temp->new(); +append_to_file($bank1, q{ + \set id random(1, 10000) + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = (2*:id + 1) RETURNING *) + INSERT into local_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 3); + COMMIT; +}); + +my $bank2 = File::Temp->new(); +append_to_file($bank2, q{ + \set id random(1, 10000) + + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = 2*:id RETURNING *) + INSERT into local_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (2*:id + 2); + COMMIT; +}); + +############################################################################### +# Helpers +############################################################################### + +sub count_and_delete_rows +{ + my ($node, $table) = @_; + my $count; + + $count = $node->safe_psql('postgres',"select count(*) from $table"); + $node->safe_psql('postgres',"delete from $table"); + diag($node->name, ": completed $count transactions"); + return $count; +} + +############################################################################### +# Concurrent global transactions +############################################################################### + +my ($err, $rc); +my $started; +my $seconds = 30; +my $selects; +my $total = '0'; +my $oldtotal = '0'; +my $isolation_errors = 0; + + +my $pgb_handle; + +$pgb_handle = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +while (time() - $started < $seconds) +{ + $total = $master->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } +} + +$master->pgbench_await($pgb_handle); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($master, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction'); + +############################################################################### +# Concurrent global and local transactions +############################################################################### + +my ($pgb_handle1, $pgb_handle2, $pgb_handle3); + +# global txses +$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +# concurrent local +$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' ); +$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' ); + +$started = time(); +$selects = 0; +$oldtotal = 0; +while (time() - $started < $seconds) +{ + $total = $master->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } +} + +diag("selects = $selects"); +$master->pgbench_await($pgb_handle1); +$shard1->pgbench_await($pgb_handle2); +$shard2->pgbench_await($pgb_handle3); + +diag("completed $selects selects"); +die "" unless ( $selects > 0 && + count_and_delete_rows($master, 'global_transactions') > 0 && + count_and_delete_rows($shard1, 'local_transactions') > 0 && + count_and_delete_rows($shard2, 'local_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global and local transactions'); + + +############################################################################### +# Snapshot stability +############################################################################### + +my ($hashes, $hash1, $hash2); +my $stability_errors = 0; + +# global txses +$pgb_handle1 = $master->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +# concurrent local +$pgb_handle2 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank1, 'postgres' ); +$pgb_handle3 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank2, 'postgres' ); + +$selects = 0; +$started = time(); +while (time() - $started < $seconds) +{ + foreach my $node ($master, $shard1, $shard2) + { + ($hash1, $_, $hash2) = split "\n", $node->safe_psql('postgres', qq[ + begin isolation level repeatable read; + select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t; + select pg_sleep(3); + select md5(array_agg((t.*)::text)::text) from (select * from accounts order by id) as t; + commit; + ]); + + if ($hash1 ne $hash2) + { + diag("oops"); + $stability_errors++; + } + elsif ($hash1 eq '' or $hash2 eq '') + { + die; + } + else + { + $selects++; + } + } +} + +$master->pgbench_await($pgb_handle1); +$shard1->pgbench_await($pgb_handle2); +$shard2->pgbench_await($pgb_handle3); + +die "" unless ( $selects > 0 && + count_and_delete_rows($master, 'global_transactions') > 0 && + count_and_delete_rows($shard1, 'local_transactions') > 0 && + count_and_delete_rows($shard2, 'local_transactions') > 0); + +is($stability_errors, 0, 'snapshot is stable during concurrent global and local transactions'); + +$master->stop; +$shard1->stop; +$shard2->stop; diff --git a/contrib/postgres_fdw/t/002_bank_participant.pl b/contrib/postgres_fdw/t/002_bank_participant.pl new file mode 100644 index 0000000000..04a2f1ba85 --- /dev/null +++ b/contrib/postgres_fdw/t/002_bank_participant.pl @@ -0,0 +1,240 @@ +use strict; +use warnings; + +use PostgresNode; +use TestLib; +use Test::More tests => 3; + +my $shard1 = get_new_node("shard1"); +$shard1->init; +$shard1->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + postgres_fdw.use_global_snapshots = on + global_snapshot_defer_time = 15 + track_global_snapshots = on + default_transaction_isolation = 'REPEATABLE READ' +)); +$shard1->start; + +my $shard2 = get_new_node("shard2"); +$shard2->init; +$shard2->append_conf('postgresql.conf', qq( + max_prepared_transactions = 30 + postgres_fdw.use_global_snapshots = on + global_snapshot_defer_time = 15 + track_global_snapshots = on + default_transaction_isolation = 'REPEATABLE READ' +)); +$shard2->start; + +############################################################################### +# Prepare nodes +############################################################################### + +my @shards = ($shard1, $shard2); + +foreach my $node (@shards) +{ + $node->safe_psql('postgres', qq[ + CREATE EXTENSION postgres_fdw; + CREATE TABLE accounts(id integer primary key, amount integer); + CREATE TABLE accounts_local() inherits(accounts); + CREATE TABLE global_transactions(tx_time timestamp); + CREATE TABLE local_transactions(tx_time timestamp); + ]); + + foreach my $neighbor (@shards) + { + next if ($neighbor eq $node); + + my $port = $neighbor->port; + my $host = $neighbor->host; + + $node->safe_psql('postgres', qq[ + CREATE SERVER shard_$port FOREIGN DATA WRAPPER postgres_fdw + options(dbname 'postgres', host '$host', port '$port'); + CREATE FOREIGN TABLE accounts_fdw_$port() inherits (accounts) + server shard_$port options(table_name 'accounts_local'); + CREATE USER MAPPING for CURRENT_USER SERVER shard_$port; + ]); + } +} + +$shard1->psql('postgres', "insert into accounts_local select 2*id-1, 0 from generate_series(1, 10010) as id;"); +$shard2->psql('postgres', "insert into accounts_local select 2*id, 0 from generate_series(1, 10010) as id;"); + +############################################################################### +# pgbench scripts +############################################################################### + +my $bank = File::Temp->new(); +append_to_file($bank, q{ + \set id random(1, 20000) + BEGIN; + WITH upd AS (UPDATE accounts SET amount = amount - 1 WHERE id = :id RETURNING *) + INSERT into global_transactions SELECT now() FROM upd; + UPDATE accounts SET amount = amount + 1 WHERE id = (:id + 1); + COMMIT; +}); + +############################################################################### +# Helpers +############################################################################### + +sub count_and_delete_rows +{ + my ($node, $table) = @_; + my $count; + + $count = $node->safe_psql('postgres',"select count(*) from $table"); + $node->safe_psql('postgres',"delete from $table"); + diag($node->name, ": completed $count transactions"); + return $count; +} + +############################################################################### +# Concurrent global transactions +############################################################################### + +my ($err, $rc); +my $started; +my $seconds = 30; +my $selects; +my $total = '0'; +my $oldtotal = '0'; +my $isolation_errors = 0; +my $i; + + +my ($pgb_handle1, $pgb_handle2); + +$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +$i = 0; +while (time() - $started < $seconds) +{ + my $shard = $shard1; + foreach my $shard (@shards) + { + $total = $shard->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("$i: Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } + } + $i++; +} + +$shard1->pgbench_await($pgb_handle1); +$shard2->pgbench_await($pgb_handle2); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($shard1, 'global_transactions') > 0 && + count_and_delete_rows($shard2, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction'); + +############################################################################### +# And do the same after soft restart +############################################################################### + +$shard1->restart; +$shard2->restart; +$shard1->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard1 to became online"; +$shard2->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard2 to became online"; + +$seconds = 15; +$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +$i = 0; + +while (time() - $started < $seconds) +{ + my $shard = $shard1; + foreach my $shard (@shards) + { + $total = $shard->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("$i: Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } + } + $i++; +} + +$shard1->pgbench_await($pgb_handle1); +$shard2->pgbench_await($pgb_handle2); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($shard1, 'global_transactions') > 0 && + count_and_delete_rows($shard2, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction after restart'); + +############################################################################### +# And do the same after hard restart +############################################################################### + +$shard1->teardown_node; +$shard2->teardown_node; +$shard1->start; +$shard2->start; +$shard1->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard1 to became online"; +$shard2->poll_query_until('postgres', "select 't'") + or die "Timed out waiting for shard2 to became online"; + + +$seconds = 15; +$pgb_handle1 = $shard1->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); +$pgb_handle2 = $shard2->pgbench_async(-n, -c => 5, -T => $seconds, -f => $bank, 'postgres' ); + +$started = time(); +$selects = 0; +$i = 0; + +while (time() - $started < $seconds) +{ + my $shard = $shard1; + foreach my $shard (@shards) + { + $total = $shard->safe_psql('postgres', "select sum(amount) from accounts"); + if ( ($total ne $oldtotal) and ($total ne '') ) + { + $isolation_errors++; + $oldtotal = $total; + diag("$i: Isolation error. Total = $total"); + } + if ($total ne '') { $selects++; } + } + $i++; +} + +$shard1->pgbench_await($pgb_handle1); +$shard2->pgbench_await($pgb_handle2); + +# sanity check +diag("completed $selects selects"); +die "no actual transactions happend" unless ( $selects > 0 && + count_and_delete_rows($shard1, 'global_transactions') > 0 && + count_and_delete_rows($shard2, 'global_transactions') > 0); + +is($isolation_errors, 0, 'isolation between concurrent global transaction after hard restart'); diff --git a/src/test/perl/PostgresNode.pm b/src/test/perl/PostgresNode.pm index 8a2c6fc122..3ae474be28 100644 --- a/src/test/perl/PostgresNode.pm +++ b/src/test/perl/PostgresNode.pm @@ -1840,6 +1840,41 @@ sub pg_recvlogical_upto } } +sub pgbench() +{ + my ($self, $node, @args) = @_; + my $pgbench_handle = $self->pgbench_async($node, @args); + $self->pgbench_await($pgbench_handle); +} + +sub pgbench_async() +{ + my ($self, @args) = @_; + + my ($in, $out, $err, $rc); + $in = ''; + $out = ''; + + my @pgbench_command = ( + 'pgbench', + -h => $self->host, + -p => $self->port, + @args + ); + my $handle = IPC::Run::start(\@pgbench_command, $in, $out); + return $handle; +} + +sub pgbench_await() +{ + my ($self, $pgbench_handle) = @_; + + # During run some pgbench threads can exit (for example due to + # serialization error). That will set non-zero returning code. + # So don't check return code here and leave it to a caller. + my $rc = IPC::Run::finish($pgbench_handle); +} + =pod =back -- 2.19.2