From 0a8212f4f8a5c3f65dc45ff220f785607b87096a Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Fri, 27 Jan 2023 03:17:30 +0000 Subject: [PATCH v42 1/2] postgres_fdw: Allow postgres_fdw_get_connections() to check connection If requested, postgres_fdw_get_connections() can verify the status of connections that are establieshed by postgres_fdw. This check wil be done by polling the socket. This feature is currently available only on systems that support the non-standard POLLRDHUP extension to the poll system call, including Linux. "closed" column is set to false if existing connection is not closed by the remote peer. --- contrib/postgres_fdw/Makefile | 2 +- contrib/postgres_fdw/connection.c | 137 +++++++++++++++++- .../postgres_fdw/expected/postgres_fdw.out | 50 ++++++- contrib/postgres_fdw/meson.build | 1 + .../postgres_fdw/postgres_fdw--1.1--1.2.sql | 17 +++ contrib/postgres_fdw/postgres_fdw.control | 2 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 42 ++++++ doc/src/sgml/postgres-fdw.sgml | 50 +++++-- 8 files changed, 274 insertions(+), 27 deletions(-) create mode 100644 contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index b9fa699305..88fdce40d6 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) EXTENSION = postgres_fdw -DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql +DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql REGRESS = postgres_fdw query_cancel diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 33e8054f64..667d8d4a82 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -12,6 +12,10 @@ */ #include "postgres.h" +#if HAVE_POLL_H +#include +#endif + #include "access/htup_details.h" #include "access/xact.h" #include "catalog/pg_user_mapping.h" @@ -107,10 +111,17 @@ static uint32 pgfdw_we_get_result = 0; (entry)->xact_depth, (entry)->xact_depth); \ } while(0) +enum pgfdwVersion +{ + PGFDW_V1_0 = 0, + PGFDW_V1_2, +} pgfdwVersion; + /* * SQL functions */ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections); +PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all); @@ -159,6 +170,12 @@ static void pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); +static Datum postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, + enum pgfdwVersion api_version); + +/* Low layer-like functions. They are used for verifying connections. */ +static int pgfdw_conn_check(PGconn *conn); +static bool pgfdw_conn_checkable(void); /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -1978,22 +1995,31 @@ pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested, } /* - * List active foreign server connections. + * Internal function used by postgres_fdw_get_connections variants. * - * This function takes no input parameter and returns setof record made of - * following values: + * If the api_version is 1.0, this function takes no input parameter and + * returns setof record made of following values: * - server_name - server name of active connection. In case the foreign server * is dropped but still the connection is active, then the server name will * be NULL in output. * - valid - true/false representing whether the connection is valid or not. * Note that the connections can get invalidated in pgfdw_inval_callback. * + * If the version is 1.2 and later, this function takes an input parameter, + * which indicates the need for a health check. Regarding the returned record, + * this returns two additional values: + * - used_in_xact - indicates whether the server has been used in a transaction + * or not. + * - closed - true if the local session seems to be disconnected from other + * servers. + * * No records are returned when there are no cached connections at all. */ -Datum -postgres_fdw_get_connections(PG_FUNCTION_ARGS) +static Datum +postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, + enum pgfdwVersion api_version) { -#define POSTGRES_FDW_GET_CONNECTIONS_COLS 2 +#define POSTGRES_FDW_GET_CONNECTIONS_COLS 4 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; HASH_SEQ_STATUS scan; ConnCacheEntry *entry; @@ -2061,12 +2087,51 @@ postgres_fdw_get_connections(PG_FUNCTION_ARGS) values[1] = BoolGetDatum(!entry->invalidated); + /* Add additional attributes if the version is 1.2 or later */ + if (api_version == PGFDW_V1_2) + { + bool require_verify = PG_GETARG_BOOL(0); + + /* Has this server been used in the transaction? */ + values[2] = BoolGetDatum(entry->xact_depth > 0); + + /* + * If requested and possible, check the status of remote connection + * from the backend process and return the result. Otherwise returns + * NULL. + */ + if (require_verify && pgfdw_conn_checkable()) + values[3] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0); + else + nulls[3] = true; + } + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } PG_RETURN_VOID(); } +/* + * List active foreign server connections. + * + * The SQL API of this function has changed since version 1.2, allowing + * verification of the status of remote connections. The actual implementation + * was moved to the internal function, and we can switch by the api_version to + * support the old SQL declaration. + */ +Datum +postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS) +{ + return postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2); +} + +Datum +postgres_fdw_get_connections(PG_FUNCTION_ARGS) +{ + return postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_0); +} + /* * Disconnect the specified cached connections. * @@ -2192,3 +2257,63 @@ disconnect_cached_connections(Oid serverid) return result; } + +/* + * Check whether the socket peer closed the connection or not. + * + * Returns >0 if input connection is bad or remote peer seems to be closed, + * 0 if it is valid, and -1 if an error occurred. + */ +static int +pgfdw_conn_check(PGconn *conn) +{ + int sock = PQsocket(conn); + + if (!pgfdw_conn_checkable()) + return 0; + + if (!conn || PQstatus(conn) != CONNECTION_OK || sock == PGINVALID_SOCKET) + return -1; + +#if (defined(HAVE_POLL) && defined(POLLRDHUP)) + { + /* + * This platform seems to have poll(2), and can wait POLLRDHUP event. + * So construct pollfd and directly call it. + */ + struct pollfd input_fd; + int result; + + input_fd.fd = sock; + input_fd.events = POLLRDHUP; + input_fd.revents = 0; + + do + result = poll(&input_fd, 1, 0); + while (result < 0 && errno == EINTR); + + if (result < 0) + return -1; + + return input_fd.revents; + } +#else + /* Do not support socket checking on this platform, return 0 */ + return 0; +#endif +} + +/* + * Check whether pgfdw_conn_check() can work on this platform. + * + * Returns true if this can use pgfdw_conn_check(), otherwise false. + */ +static bool +pgfdw_conn_checkable(void) +{ +#if (defined(HAVE_POLL) && defined(POLLRDHUP)) + return true; +#else + return false; +#endif +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 39b2b317e8..22fba7aeb5 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10464,10 +10464,10 @@ drop cascades to foreign table ft7 -- should be output as invalid connections. Also the server name for -- loopback3 should be NULL because the server was dropped. SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; - server_name | valid --------------+------- - loopback | f - | f + server_name | valid | used_in_xact | closed +-------------+-------+--------------+-------- + loopback | f | t | + | f | t | (2 rows) -- The invalid connections get closed in pgfdw_xact_callback during commit. @@ -12286,3 +12286,45 @@ ANALYZE analyze_table; -- cleanup DROP FOREIGN TABLE analyze_ftable; DROP TABLE analyze_table; +-- =================================================================== +-- test for postgres_fdw_get_connections function with require_verify = true +-- =================================================================== +-- Disable debug_discard_caches in order to manage remote connections +SET debug_discard_caches TO '0'; +-- The text of the error might vary across platforms, so only show SQLSTATE. +\set VERBOSITY sqlstate +-- Disconnect once and set application_name to an arbitrary value +SELECT 1 FROM postgres_fdw_disconnect_all(); + ?column? +---------- + 1 +(1 row) + +ALTER SERVER loopback OPTIONS (SET application_name 'healthcheck'); +-- Define procedure for testing verify functions +CREATE PROCEDURE test_verify_function() AS $$ +DECLARE + result boolean; +BEGIN + PERFORM 1 FROM ft1 LIMIT 1; + + -- Terminate the remote backend process + PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity + WHERE application_name = 'healthcheck'; + + SELECT closed INTO result FROM postgres_fdw_get_connections(true); + + -- If result is TRUE or NULL, we succeeded to detect the disconnection or + -- it could not be done on this platform. Raise an message. + IF result IS NOT false THEN + RAISE INFO 'postgres_fdw_get_connections could detect the disconnection, or health check cannot be used on this platform'; + END IF; +END; +$$ LANGUAGE plpgsql; +-- ..And call above function +CALL test_verify_function(); +INFO: 00000 +ERROR: 08006 +-- Clean up +\set VERBOSITY default +RESET debug_discard_caches; diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build index f0803ee077..3014086ba6 100644 --- a/contrib/postgres_fdw/meson.build +++ b/contrib/postgres_fdw/meson.build @@ -26,6 +26,7 @@ install_data( 'postgres_fdw.control', 'postgres_fdw--1.0.sql', 'postgres_fdw--1.0--1.1.sql', + 'postgres_fdw--1.1--1.2.sql', kwargs: contrib_data_args, ) diff --git a/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql new file mode 100644 index 0000000000..9afbc5c39b --- /dev/null +++ b/contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql @@ -0,0 +1,17 @@ +/* contrib/postgres_fdw/postgres_fdw--1.1--1.2.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.2'" to load this file. \quit + +/* First we have to remove it from the extension */ +ALTER EXTENSION postgres_fdw DROP FUNCTION postgres_fdw_get_connections (); + +/* Then we can drop it */ +DROP FUNCTION postgres_fdw_get_connections (); + +CREATE FUNCTION postgres_fdw_get_connections + (IN require_verify boolean DEFAULT false, OUT server_name text, + OUT valid boolean, OUT used_in_xact boolean, OUT closed boolean) +RETURNS SETOF record +AS 'MODULE_PATHNAME', 'postgres_fdw_get_connections_1_2' +LANGUAGE C STRICT PARALLEL RESTRICTED; diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control index d489382064..a4b800be4f 100644 --- a/contrib/postgres_fdw/postgres_fdw.control +++ b/contrib/postgres_fdw/postgres_fdw.control @@ -1,5 +1,5 @@ # postgres_fdw extension comment = 'foreign-data wrapper for remote PostgreSQL servers' -default_version = '1.1' +default_version = '1.2' module_pathname = '$libdir/postgres_fdw' relocatable = true diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 8be9f99c19..653d049946 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -4235,3 +4235,45 @@ ANALYZE analyze_table; -- cleanup DROP FOREIGN TABLE analyze_ftable; DROP TABLE analyze_table; + +-- =================================================================== +-- test for postgres_fdw_get_connections function with require_verify = true +-- =================================================================== + +-- Disable debug_discard_caches in order to manage remote connections +SET debug_discard_caches TO '0'; + +-- The text of the error might vary across platforms, so only show SQLSTATE. +\set VERBOSITY sqlstate + +-- Disconnect once and set application_name to an arbitrary value +SELECT 1 FROM postgres_fdw_disconnect_all(); +ALTER SERVER loopback OPTIONS (SET application_name 'healthcheck'); + +-- Define procedure for testing verify functions +CREATE PROCEDURE test_verify_function() AS $$ +DECLARE + result boolean; +BEGIN + PERFORM 1 FROM ft1 LIMIT 1; + + -- Terminate the remote backend process + PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity + WHERE application_name = 'healthcheck'; + + SELECT closed INTO result FROM postgres_fdw_get_connections(true); + + -- If result is TRUE or NULL, we succeeded to detect the disconnection or + -- it could not be done on this platform. Raise an message. + IF result IS NOT false THEN + RAISE INFO 'postgres_fdw_get_connections could detect the disconnection, or health check cannot be used on this platform'; + END IF; +END; +$$ LANGUAGE plpgsql; + +-- ..And call above function +CALL test_verify_function(); + +-- Clean up +\set VERBOSITY default +RESET debug_discard_caches; diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index 1a600382e2..c4b52ddefc 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -777,27 +777,47 @@ OPTIONS (ADD password_required 'false'); - postgres_fdw_get_connections(OUT server_name text, OUT valid boolean) returns setof record + postgres_fdw_get_connections(IN require_verify boolean DEFAULT false, + OUT server_name text, OUT valid boolean, OUT used_in_xact boolean, OUT closed boolean) + returns setof record This function returns the foreign server names of all the open connections that postgres_fdw established from - the local session to the foreign servers. It also returns whether - each connection is valid or not. false is returned - if the foreign server connection is used in the current local - transaction but its foreign server or user mapping is changed or - dropped (Note that server name of an invalid connection will be - NULL if the server is dropped), - and then such invalid connection will be closed at - the end of that transaction. true is returned - otherwise. If there are no open connections, no record is returned. + the local session to the foreign servers. It also returns whether each + connection is valid and has been used within a transaction. + valid is returned as false if the + foreign server connection is used in the current local transaction but + its foreign server or user mapping is changed or dropped (Note that + server name of an invalid connection will be NULL if + the server is dropped), and then such invalid connection will be closed + at the end of that transaction. The attribute is set to true + otherwise. + + + If require_verify is set to true, + the function checks the status of remote connections from the local + session to the foreign server. This check is performed by polling the + socket, which allows long-running transactions to be aborted sooner if + the kernel reports that the connection is closed. This feature is + currently available only on systems that support the non-standard + POLLRDHUP extension to the poll system + call, including Linux. closed attribute indicates the + result of this check. false means the existing + connection is not closed by the remote peer. true + means if either checked connection has been closed. + NULL means this feature is not available on + this platform or require_verify is false. + + + If there are no open connections, no record is returned. Example usage of the function: -postgres=# SELECT * FROM postgres_fdw_get_connections() ORDER BY 1; - server_name | valid --------------+------- - loopback1 | t - loopback2 | f +postgres=*# SELECT * FROM postgres_fdw_get_connections(true) ORDER BY 1; + server_name | valid | used_in_xact | closed +-------------+-------+--------------+-------- + loopback1 | t | t | f + loopback2 | t | t | f -- 2.43.0