From 80e2894d3289cebd9c12d457620bf2965cc0252d Mon Sep 17 00:00:00 2001 From: Torsten Foertsch Date: Sat, 16 Nov 2024 12:42:11 +0100 Subject: [PATCH v3] Allow pg_recvlogical to create temp slots With this patch pg_recvlogical can be called with the --temporary-slot option together with --create-slot and --start. If called that way, the created slot exists only for the duration of the connection. If the connection is dropped and reestablished by pg_recvlogical, a new temp slot by the same name is created. If the slot exists and --if-not-exists is not passed, then pg_recvlogical fails. If the slot exists and --if-not-exists is given, the slot will be used. In addition a few tests have been added for previously untested options. Discussion: /message-id/CAKkG4_%3DoMpa-AXhw9m044ZH5YdneNFTp6WxG_kEPA0cTkfiMNQ%40mail.gmail.com --- doc/src/sgml/ref/pg_recvlogical.sgml | 25 +++++++ src/bin/pg_basebackup/pg_recvlogical.c | 33 ++++++++-- src/bin/pg_basebackup/t/030_pg_recvlogical.pl | 65 +++++++++++++++++++ 3 files changed, 117 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml index 95eb14b635..72e9952966 100644 --- a/doc/src/sgml/ref/pg_recvlogical.sgml +++ b/doc/src/sgml/ref/pg_recvlogical.sgml @@ -268,6 +268,31 @@ PostgreSQL documentation + + + + + Create a temporary replication slot. This option is only useful if + specified together with and + . A temporary slot is automatically dropped + when the database connection is dropped. + + + If used together with and if a + permanent slot by the requested name exists, that slot is used instead + of creating a new one. That permanent slot is then not automatically + removed when the connection is dropped. + + + Unless is specified, + pg_recvlogical will try to reconnect automatically + when the server connection is lost. If a temporary slot is requested, + the slot will be recreated first. If at that time another slot by the + same name exists, creation will fail. + + + + diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 42b2a7bb9d..6ec02e427f 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -48,6 +48,7 @@ static int fsync_interval = 10 * 1000; /* 10 sec = default */ static XLogRecPtr startpos = InvalidXLogRecPtr; static XLogRecPtr endpos = InvalidXLogRecPtr; static bool do_create_slot = false; +static bool slot_is_temporary = false; static bool slot_exists_ok = false; static bool do_start_slot = false; static bool do_drop_slot = false; @@ -102,6 +103,7 @@ usage(void) printf(_(" -s, --status-interval=SECS\n" " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n")); + printf(_(" --temporary-slot the slot created exists until the connection is dropped\n")); printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); @@ -214,7 +216,7 @@ StreamLogicalLog(void) char *copybuf = NULL; TimestampTz last_status = -1; int i; - PQExpBuffer query; + PQExpBuffer query = NULL; XLogRecPtr cur_record_lsn; output_written_lsn = InvalidXLogRecPtr; @@ -225,10 +227,24 @@ StreamLogicalLog(void) * Connect in replication mode to the server */ if (!conn) + { conn = GetConnection(); - if (!conn) - /* Error message already written in GetConnection() */ - return; + if (!conn) + /* Error message already written in GetConnection() */ + return; + + /* Recreate a replication slot. */ + if (do_create_slot && slot_is_temporary) + { + if (verbose) + pg_log_info("recreating replication slot \"%s\"", replication_slot); + + if (!CreateReplicationSlot(conn, replication_slot, plugin, slot_is_temporary, + false, false, slot_exists_ok, two_phase)) + goto error; + startpos = InvalidXLogRecPtr; + } + } /* * Start the replication @@ -654,7 +670,8 @@ error: PQfreemem(copybuf); copybuf = NULL; } - destroyPQExpBuffer(query); + if (query != NULL) + destroyPQExpBuffer(query); PQfinish(conn); conn = NULL; } @@ -717,6 +734,7 @@ main(int argc, char **argv) {"start", no_argument, NULL, 2}, {"drop-slot", no_argument, NULL, 3}, {"if-not-exists", no_argument, NULL, 4}, + {"temporary-slot", no_argument, NULL, 5}, {NULL, 0, NULL, 0} }; int c; @@ -845,6 +863,9 @@ main(int argc, char **argv) case 4: slot_exists_ok = true; break; + case 5: + slot_is_temporary = true; + break; default: /* getopt_long already emitted a complaint */ @@ -979,7 +1000,7 @@ main(int argc, char **argv) if (verbose) pg_log_info("creating replication slot \"%s\"", replication_slot); - if (!CreateReplicationSlot(conn, replication_slot, plugin, false, + if (!CreateReplicationSlot(conn, replication_slot, plugin, slot_is_temporary, false, false, slot_exists_ok, two_phase)) exit(1); startpos = InvalidXLogRecPtr; diff --git a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl index 8432e5660d..0b46574fc1 100644 --- a/src/bin/pg_basebackup/t/030_pg_recvlogical.pl +++ b/src/bin/pg_basebackup/t/030_pg_recvlogical.pl @@ -65,6 +65,23 @@ $node->command_ok( ], 'replayed a transaction'); +$node->command_fails( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--create-slot' + ], + 'slot cannot be created again'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--create-slot', + '--if-not-exists' + ], + 'if-not-exists'); + $node->command_ok( [ 'pg_recvlogical', '-S', @@ -73,6 +90,54 @@ $node->command_ok( ], 'slot dropped'); +# slot() returns a hash with all values set to the empty string if the +# slot does not exist. +$slot = $node->slot('test'); +is(0+grep({$_ ne ''} values %$slot), 0, 'slot does not exist anymore'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--create-slot', '--temporary-slot', + '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' + ], + 'create temporary slot'); + +$slot = $node->slot('test'); +is(0+grep({$_ ne ''} values %$slot), 0, 'temp slot is gone when connection is dropped'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--create-slot', + ], + 'create slot again'); + +$node->command_fails( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--create-slot', '--temporary-slot', + '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' + ], + 'create temporary slot fails if target slot exists'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', 'test', '-d', $node->connstr('postgres'), + '--create-slot', '--temporary-slot', '--if-not-exists', + '--start', '--endpos', "$nextlsn", '--no-loop', '-f', '-' + ], + 'create temporary with --if-not-exists'); + +$node->command_ok( + [ + 'pg_recvlogical', '-S', + 'test', '-d', + $node->connstr('postgres'), '--drop-slot' + ], + 'slot dropped again'); + #test with two-phase option enabled $node->command_ok( [ -- 2.34.1