diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index baf328b620..996386fdd4 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -269,6 +269,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) readmessage: HOLD_CANCEL_INTERRUPTS(); + HOLD_CHECKING_REMOTE_SERVERS_INTERRUPTS(); pq_startmsgread(); mtype = pq_getbyte(); if (mtype == EOF) @@ -300,6 +301,7 @@ CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("unexpected EOF on client connection with an open transaction"))); + RESUME_CHECKING_REMOTE_SERVERS_INTERRUPTS(); RESUME_CANCEL_INTERRUPTS(); /* ... and process it */ switch (mtype) diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index 294e22c78c..9779eadf01 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -26,7 +26,13 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" +#include "utils/timeout.h" +/* for checking remote servers */ +int remote_servers_connection_check_interval = 0; +static CheckingRemoteServersCallbackItem *fdw_callbacks = NULL; +/* counter for keeping re-entrancy */ +static int timeout_counter = 0; /* * GetForeignDataWrapper - look up the foreign-data wrapper by OID. @@ -836,3 +842,111 @@ GetExistingLocalJoinPath(RelOptInfo *joinrel) } return NULL; } + +/* + * Register callbacks for checking remote servers. + * + * This function is intended for use by FDW extensions. + * The checking timeout will be fired after registering the first callback. + */ +CheckingRemoteServersCallbackItem * +RegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback callback, + void *arg) +{ + CheckingRemoteServersCallbackItem *item; + + item = (CheckingRemoteServersCallbackItem *) + MemoryContextAlloc(TopMemoryContext, + sizeof(CheckingRemoteServersCallbackItem)); + item->callback = callback; + item->counter = 0; + item->arg = arg; + item->next = fdw_callbacks; + fdw_callbacks = item; + + return item; +} + +/* + * Call callbacks for checking remote servers if needed. + * This retruns the downed server, or NULL if all servers are good. + */ +ForeignServer * +CallCheckingRemoteServersCallbacks(void) +{ + CheckingRemoteServersCallbackItem *item; + ForeignServer *server = NULL; + + for (item = fdw_callbacks; item; item = item->next) + { + if (item->counter > 0) + { + server = item->callback(item->arg); + if (server != NULL) + break; + } + } + + return server; +} + +/* + * Increment counters, and enable timeout if it has been not started yet. + */ +int +TryEnableRemoteServerCheckingTimeout(CheckingRemoteServersCallbackItem *item) +{ + timeout_counter++; + item->counter++; + + if (remote_servers_connection_check_interval > 0 && + !get_timeout_active(CHECKING_REMOTE_SERVERS_TIMEOUT)) + enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT, + remote_servers_connection_check_interval); + + return timeout_counter; +} + +/* + * Decrement counters, and disable timeout if it is no more needed. + * + * This functions must be called after TryEnableRemoteServerCheckingTimeout(). + */ +int +TryDisableRemoteServerCheckingTimeout(CheckingRemoteServersCallbackItem *item) +{ + Assert(timeout_counter > 0 && item->counter > 0); + timeout_counter--; + item->counter--; + + if (timeout_counter == 0 && + get_timeout_active(CHECKING_REMOTE_SERVERS_TIMEOUT)) + disable_timeout(CHECKING_REMOTE_SERVERS_TIMEOUT, false); + + return timeout_counter; +} + +void +assign_remote_servers_connection_check_interval(int newval, + void *extra) +{ + /* Quick return if we don't have any callbacks */ + if (fdw_callbacks == NULL) + return; + + if (get_timeout_active(CHECKING_REMOTE_SERVERS_TIMEOUT)) + { + if (newval == 0) + disable_timeout(CHECKING_REMOTE_SERVERS_TIMEOUT, false); + + /* + * we don't have to do anything because + * new value will be used in ProcessInterrupts(). + */ + return; + } + + /* Start timeout if anyone wants to */ + if (newval > 0 && timeout_counter > 0) + enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT, newval); +} \ No newline at end of file diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index fda2e9360e..b99fcf7b48 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -44,6 +44,7 @@ #include "commands/prepare.h" #include "common/pg_prng.h" #include "executor/spi.h" +#include "foreign/foreign.h" #include "jit/jit.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -350,6 +351,7 @@ SocketBackend(StringInfo inBuf) * Get message type code from the frontend. */ HOLD_CANCEL_INTERRUPTS(); + HOLD_CHECKING_REMOTE_SERVERS_INTERRUPTS(); pq_startmsgread(); qtype = pq_getbyte(); @@ -456,6 +458,7 @@ SocketBackend(StringInfo inBuf) */ if (pq_getmessage(inBuf, maxmsglen)) return EOF; /* suitable message already logged */ + RESUME_CHECKING_REMOTE_SERVERS_INTERRUPTS(); RESUME_CANCEL_INTERRUPTS(); return qtype; @@ -3225,6 +3228,39 @@ ProcessInterrupts(void) errmsg("connection to client lost"))); } + if (CheckingRemoteServersTimeoutPending) + { + if (CheckingRemoteServersHoldoffCount != 0 || DoingCommandRead) + { + /* + * Skip checking foreign servers while reading messages or commands. + */ + InterruptPending = true; + } + else + { + CheckingRemoteServersTimeoutPending = false; + + /* Check remote servers and re-arm, if still configured. */ + if (remote_servers_connection_check_interval > 0) + { + ForeignServer *downedServer = CallCheckingRemoteServersCallbacks(); + if (downedServer != NULL) + { + LockErrorCleanup(); + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("foreign server \"%s\" disconnected due to the health-check failure", + downedServer->servername), + errdetail("Foreign server might be down."), + errhint("Please check the server and network health.")); + } + enable_timeout_after(CHECKING_REMOTE_SERVERS_TIMEOUT, + remote_servers_connection_check_interval); + } + } + } + /* * If a recovery conflict happens while we are waiting for input from the * client, the client is presumably just sitting idle in a transaction, diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index c26a1a73df..7efa9cff84 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -36,9 +36,11 @@ volatile sig_atomic_t IdleInTransactionSessionTimeoutPending = false; volatile sig_atomic_t IdleSessionTimeoutPending = false; volatile sig_atomic_t ProcSignalBarrierPending = false; volatile sig_atomic_t LogMemoryContextPending = false; +volatile sig_atomic_t CheckingRemoteServersTimeoutPending = false; volatile uint32 InterruptHoldoffCount = 0; volatile uint32 QueryCancelHoldoffCount = 0; volatile uint32 CritSectionCount = 0; +volatile uint32 CheckingRemoteServersHoldoffCount = 0; int MyProcPid; pg_time_t MyStartTime; diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 5b9ed2f6f5..9b0b4ddd58 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -75,6 +75,7 @@ static void LockTimeoutHandler(void); static void IdleInTransactionSessionTimeoutHandler(void); static void IdleSessionTimeoutHandler(void); static void ClientCheckTimeoutHandler(void); +static void CheckingRemoteServersTimeoutHandler(void); static bool ThereIsAtLeastOneRole(void); static void process_startup_options(Port *port, bool am_superuser); static void process_settings(Oid databaseid, Oid roleid); @@ -628,6 +629,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, IdleInTransactionSessionTimeoutHandler); RegisterTimeout(IDLE_SESSION_TIMEOUT, IdleSessionTimeoutHandler); RegisterTimeout(CLIENT_CONNECTION_CHECK_TIMEOUT, ClientCheckTimeoutHandler); + RegisterTimeout(CHECKING_REMOTE_SERVERS_TIMEOUT, CheckingRemoteServersTimeoutHandler); } /* @@ -1246,6 +1248,14 @@ ClientCheckTimeoutHandler(void) SetLatch(MyLatch); } +static void +CheckingRemoteServersTimeoutHandler(void) +{ + CheckingRemoteServersTimeoutPending = true; + InterruptPending = true; + SetLatch(MyLatch); +} + /* * Returns true if at least one role is defined in this database cluster. */ diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index b3fd42e0f1..6dfc171448 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -52,6 +52,7 @@ #include "commands/vacuum.h" #include "commands/variable.h" #include "common/string.h" +#include "foreign/foreign.h" #include "funcapi.h" #include "jit/jit.h" #include "libpq/auth.h" @@ -106,6 +107,7 @@ #include "utils/queryjumble.h" #include "utils/rls.h" #include "utils/snapmgr.h" +#include "utils/timeout.h" #include "utils/tzparser.h" #include "utils/inval.h" #include "utils/varlena.h" @@ -3600,6 +3602,18 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + + { + {"remote_servers_connection_check_interval", PGC_USERSET, CONN_AUTH_SETTINGS, + gettext_noop("Sets the time interval between checks for disconnection of remote servers."), + NULL, + GUC_UNIT_MS + }, + &remote_servers_connection_check_interval, + 0, 0, INT_MAX, + NULL, assign_remote_servers_connection_check_interval, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, 0, 0, 0, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 817d5f5324..b97dc30514 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -90,6 +90,10 @@ # disconnection while running queries; # 0 for never +#remote_servers_connection_check_interval = 0 # time between time between checks for + # foreign server disconnection; + # 0 for never + # - Authentication - #authentication_timeout = 1min # 1s-600s diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index 75538110fc..922e63922b 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -81,4 +81,26 @@ extern List *GetForeignColumnOptions(Oid relid, AttrNumber attnum); extern Oid get_foreign_data_wrapper_oid(const char *fdwname, bool missing_ok); extern Oid get_foreign_server_oid(const char *servername, bool missing_ok); + +/* functions and variables for fdw checking. */ +typedef ForeignServer* (*CheckingRemoteServersCallback) (void *arg); +typedef struct CheckingRemoteServersCallbackItem +{ + struct CheckingRemoteServersCallbackItem *next; + CheckingRemoteServersCallback callback; + int counter; + void *arg; +} CheckingRemoteServersCallbackItem; + +extern CheckingRemoteServersCallbackItem * +RegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback callback, + void *arg); +extern ForeignServer *CallCheckingRemoteServersCallbacks(void); + +extern int TryEnableRemoteServerCheckingTimeout(CheckingRemoteServersCallbackItem *item); +extern int TryDisableRemoteServerCheckingTimeout(CheckingRemoteServersCallbackItem *item); + +extern int remote_servers_connection_check_interval; +extern void assign_remote_servers_connection_check_interval(int newval, + void *extra); #endif /* FOREIGN_H */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 02276d3edd..0319fe1605 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -98,10 +98,13 @@ extern PGDLLIMPORT volatile sig_atomic_t LogMemoryContextPending; extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending; extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost; +extern PGDLLIMPORT volatile sig_atomic_t CheckingRemoteServersTimeoutPending; + /* these are marked volatile because they are examined by signal handlers: */ extern PGDLLIMPORT volatile uint32 InterruptHoldoffCount; extern PGDLLIMPORT volatile uint32 QueryCancelHoldoffCount; extern PGDLLIMPORT volatile uint32 CritSectionCount; +extern PGDLLIMPORT volatile uint32 CheckingRemoteServersHoldoffCount; /* in tcop/postgres.c */ extern void ProcessInterrupts(void); @@ -126,7 +129,7 @@ do { \ /* Is ProcessInterrupts() guaranteed to clear InterruptPending? */ #define INTERRUPTS_CAN_BE_PROCESSED() \ (InterruptHoldoffCount == 0 && CritSectionCount == 0 && \ - QueryCancelHoldoffCount == 0) + QueryCancelHoldoffCount == 0 && CheckingRemoteServersHoldoffCount == 0) #define HOLD_INTERRUPTS() (InterruptHoldoffCount++) @@ -152,6 +155,13 @@ do { \ CritSectionCount--; \ } while(0) +#define HOLD_CHECKING_REMOTE_SERVERS_INTERRUPTS() (CheckingRemoteServersHoldoffCount++) + +#define RESUME_CHECKING_REMOTE_SERVERS_INTERRUPTS() \ +do { \ + Assert(CheckingRemoteServersHoldoffCount > 0); \ + CheckingRemoteServersHoldoffCount--; \ +} while(0) /***************************************************************************** * globals.h -- * diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h index 099f91c61d..7f02a8b491 100644 --- a/src/include/utils/timeout.h +++ b/src/include/utils/timeout.h @@ -34,6 +34,7 @@ typedef enum TimeoutId IDLE_SESSION_TIMEOUT, CLIENT_CONNECTION_CHECK_TIMEOUT, STARTUP_PROGRESS_TIMEOUT, + CHECKING_REMOTE_SERVERS_TIMEOUT, /* First user-definable timeout reason */ USER_TIMEOUT, /* Maximum number of timeout reasons */