From 437b098b2ec6334affb2d818cf9154c7f170e2dc Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 12 Jan 2022 09:52:05 +0100 Subject: [PATCH] Add non-blocking version of PQcancel This patch does four things: 1. Change the PQrequestCancel implementation to use the regular connection establishement code, to support all connection options including encryption. 2. Add PQrequestCancelStart which is a thread-safe and non-blocking version of this new PQrequestCancel implementation. 3. Add PQconnectComplete, which completes a connection started by PQrequestCancelStart. This is useful if you want a thread-safe but blocking cancel (without having a need for signal-safety). 4. Use this new cancellation API everywhere in the codebase where signal-safety is not a necessity. This change un-deprecates PQrequestCancel, since now there's actually an advantage to using it over PQcancel. It also includes user facing documentation for all the newly added functions. The existing PQcancel API is using blocking IO. This makes PQcancel impossible to use in an event loop based codebase, without blocking the event loop until the call returns. PQrequestCancelStart can now be used instead, to have a non-blocking way of sending cancel requests. The postgres_fdw cancellation code has been modified to make use of this. This patch also includes a test for all of libpq cancellation APIs. The test can be easily run like this: cd src/test/modules/libpq_pipeline make && ./libpq_pipeline cancel --- contrib/dblink/dblink.c | 28 +- contrib/postgres_fdw/connection.c | 93 ++++- .../postgres_fdw/expected/postgres_fdw.out | 15 + contrib/postgres_fdw/sql/postgres_fdw.sql | 8 + doc/src/sgml/libpq.sgml | 212 +++++++++-- src/fe_utils/connect_utils.c | 10 +- src/interfaces/libpq/exports.txt | 2 + src/interfaces/libpq/fe-connect.c | 341 +++++++++++++++--- src/interfaces/libpq/fe-misc.c | 15 +- src/interfaces/libpq/fe-secure-openssl.c | 2 +- src/interfaces/libpq/fe-secure.c | 6 + src/interfaces/libpq/libpq-fe.h | 9 +- src/interfaces/libpq/libpq-int.h | 4 + src/test/isolation/isolationtester.c | 28 +- .../modules/libpq_pipeline/libpq_pipeline.c | 274 +++++++++++++- 15 files changed, 894 insertions(+), 153 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index a561d1d652..b572cf6d5b 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -1379,22 +1379,30 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query); Datum dblink_cancel_query(PG_FUNCTION_ARGS) { - int res; PGconn *conn; - PGcancel *cancel; - char errbuf[256]; + PGconn *cancelConn; + char *msg; dblink_init(); conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); - cancel = PQgetCancel(conn); - - res = PQcancel(cancel, errbuf, 256); - PQfreeCancel(cancel); + cancelConn = PQrequestCancelStart(conn); + if (PQstatus(cancelConn) == CONNECTION_BAD) + { + msg = pchomp(PQerrorMessage(cancelConn)); + PQfinish(cancelConn); + PG_RETURN_TEXT_P(cstring_to_text(msg)); + } - if (res == 1) - PG_RETURN_TEXT_P(cstring_to_text("OK")); + if (PQconnectComplete(cancelConn)) + { + msg = "OK"; + } else - PG_RETURN_TEXT_P(cstring_to_text(errbuf)); + { + msg = pchomp(PQerrorMessage(cancelConn)); + } + PQfinish(cancelConn); + PG_RETURN_TEXT_P(cstring_to_text(msg)); } diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 061ffaf329..fa47274d15 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -1264,35 +1264,98 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) static bool pgfdw_cancel_query(PGconn *conn) { - PGcancel *cancel; - char errbuf[256]; PGresult *result = NULL; - TimestampTz endtime; - bool timed_out; /* * If it takes too long to cancel the query and discard the result, assume * the connection is dead. */ - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + TimestampTz endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + bool timed_out = false; + bool failed = false; + PGconn *cancel_conn = PQrequestCancelStart(conn); - /* - * Issue cancel request. Unfortunately, there's no good way to limit the - * amount of time that we might block inside PQgetCancel(). - */ - if ((cancel = PQgetCancel(conn))) + + if (PQstatus(cancel_conn) == CONNECTION_BAD) + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + pchomp(PQerrorMessage(cancel_conn))))); + return false; + } + + /* In what follows, do not leak any PGconn on an error. */ + PG_TRY(); + { + while (true) + { + TimestampTz now = GetCurrentTimestamp(); + long cur_timeout; + PostgresPollingStatusType pollres = PQconnectPoll(cancel_conn); + int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + if (pollres == PGRES_POLLING_OK) + { + break; + } + + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) + { + timed_out = true; + failed = true; + goto exit; + } + + switch (pollres) + { + case PGRES_POLLING_READING: + waitEvents |= WL_SOCKET_READABLE; + break; + case PGRES_POLLING_WRITING: + waitEvents |= WL_SOCKET_WRITEABLE; + break; + default: + failed = true; + goto exit; + } + + /* Sleep until there's something to do */ + WaitLatchOrSocket(MyLatch, waitEvents, PQsocket(cancel_conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } +exit: ; + } + PG_CATCH(); { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + PQfinish(cancel_conn); + PG_RE_THROW(); + } + PG_END_TRY(); + + if (failed) + { + if (timed_out) + { + ereport(WARNING, + (errmsg("could not cancel request due to timeout"))); + } + else { ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - return false; + pchomp(PQerrorMessage(cancel_conn))))); } - PQfreeCancel(cancel); + PQfinish(cancel_conn); + return failed; } + PQfinish(cancel_conn); /* Get and discard the result of the query. */ if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out)) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 44457f930c..6d108b56ef 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -2567,6 +2567,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c (10 rows) ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(*)) + Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)) + Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE)) +(4 rows) + +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +ERROR: canceling statement due to statement timeout +RESET statement_timeout; -- cleanup DROP OWNED BY regress_view_owner; DROP ROLE regress_view_owner; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 92d1212027..7e02ed6803 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -326,6 +326,7 @@ DELETE FROM loct_empty; ANALYZE ft_empty; EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft_empty ORDER BY c1; + -- =================================================================== -- WHERE with remotely-executable conditions -- =================================================================== @@ -681,6 +682,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +RESET statement_timeout; + -- cleanup DROP OWNED BY regress_view_owner; DROP ROLE regress_view_owner; diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 37ec3cb4e5..8e033bb8f3 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -265,7 +265,7 @@ PGconn *PQsetdb(char *pghost, PQconnectStartParamsPQconnectStartParams PQconnectStartPQconnectStart - PQconnectPollPQconnectPoll + PQconnectPollPQconnectPoll nonblocking connection @@ -499,6 +499,30 @@ switch(PQstatus(conn)) + + PQconnectCompletePQconnectComplete + + + Complete the connection attempt on a nonblocking connection and block + until it is completed. + + +int PQconnectPoll(PGconn *conn); + + + + + This function can be used instead of + + to complete a connection that was initially started in a non blocking + manner. However, instead of continuing to complete the connection in a + non blocking way, calling this function will block until the connection + is completed. This is especially useful to complete connections that were + started by . + + + + PQconndefaultsPQconndefaults @@ -660,7 +684,7 @@ void PQreset(PGconn *conn); PQresetStartPQresetStart - PQresetPollPQresetPoll + PQresetPollPQresetPoll Reset the communication channel to the server, in a nonblocking manner. @@ -5617,13 +5641,137 @@ int PQsetSingleRowMode(PGconn *conn); this section. + + PQrequestCancelPQrequestCancel + + + + Requests that the server abandons processing of the current command. + +int PQrequestCancel(PGconn *conn); + + + + + This request is made over a connection that uses the same connection + options as the the original PGconn. So when the + original connection is encrypted (using TLS or GSS), the connection for + the cancel request connection is encrypted in the same. Any connection + options that only make sense for authentication or after authentication + are ignored though, because cancellation requests do not require + authentication. + + + + This function operates directly on the PGconn + object, and in case of failure stores the error message in the + PGconn object (whence it can be retrieved + by ). This behaviour makes this + function unsafe to call from within multi-threaded programs or + signal handlers, since it is possible that overwriting the + PGconn's error message will + mess up the operation currently in progress on the connection in another + thread. + + + + The return value is 1 if the cancel request was successfully + dispatched and 0 if not. Successful dispatch is no guarantee that the + request will have any effect, however. If the cancellation is effective, + the current command will terminate early and return an error result. If + the cancellation fails (say, because the server was already done + processing the command), then there will be no visible result at + all. + + + + + + PQrequestCancelStartPQrequestCancelStart + + + + A version of + + that can be used in thread-safe and/or non-blocking manner. + +PGconn *PQrequestCancelStart(PGconn *conn); + + + + + This function returns a new PGconn. This + connection object can be used to cancel the query that's running on the + original connection in a thread-safe way. To do so + + must be called while no other thread is using the original PGconn. Then + the returned PGconn + can be used at a later point in any thread to send a cancel request. + A cancel request can be sent using the returned PGconn in two ways, + non-blocking using + or blocking using . + + + + In addition to all the statuses that a regular + PGconn + can have returned connection can have two additional statuses: + + + + CONNECTION_STARTING + + + Waiting for the first call to , + to actually open the socket. This is the connection state right after + calling . No connection to the + server has been initiated yet at this point. To start cancel request + initiation use + for non-blocking behaviour and + for blocking behaviour. + + + + + + CONNECTION_CANCEL_FINISHED + + + Cancel request was successfully sent. It's not possible to continue + using the cancellation connection now, so it should be freed using + . It's also possible to reset the + cancellation connection instead using + , that way it can be reused to + cancel a future query on the same connection. + + + + + + + + Since this object represents a connection only meant for cancellations it + can only be used with a limited subset of the functions that can be used + for a regular PGconn object. The functions that + this object can be passed to are + , + , + , + , + , + , and + . + + + + PQgetCancelPQgetCancel Creates a data structure containing the information needed to cancel - a command issued through a particular database connection. + a command using . PGcancel *PQgetCancel(PGconn *conn); @@ -5665,7 +5813,9 @@ void PQfreeCancel(PGcancel *cancel); - Requests that the server abandon processing of the current command. + A less secure version of + + that can be used safely from within a signal handler. int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); @@ -5679,15 +5829,6 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); recommended size is 256 bytes). - - Successful dispatch is no guarantee that the request will have - any effect, however. If the cancellation is effective, the current - command will terminate early and return an error result. If the - cancellation fails (say, because the server was already done - processing the command), then there will be no visible result at - all. - - can safely be invoked from a signal handler, if the errbuf is a local variable in the @@ -5696,33 +5837,24 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); also be invoked from a thread that is separate from the one manipulating the PGconn object. - - - - - - - PQrequestCancelPQrequestCancel - - - - is a deprecated variant of - . - -int PQrequestCancel(PGconn *conn); - - - Requests that the server abandon processing of the current - command. It operates directly on the - PGconn object, and in case of failure stores the - error message in the PGconn object (whence it can - be retrieved by ). Although - the functionality is the same, this approach is not safe within - multiple-thread programs or signal handlers, since it is possible - that overwriting the PGconn's error message will - mess up the operation currently in progress on the connection. + To achieve signal-safety, some concessions needed to be made in the + implementation of . Not all connection + options of the original connection are used when establishing a + connection for the cancellation request. When calling this function a + connection is made to the postgres host using the same port. The only + connection options that are honored during this connection are + keepalives, + keepalives_idle, + keepalives_interval, + keepalives_count, and + tcp_user_timeout. + So, for example + connect_timeout, + gssencmode, and + sslmode are ignored. This means the connection + is never encrypted using TLS or GSS. @@ -8856,10 +8988,10 @@ int PQisthreadsafe(); - The deprecated functions and + The functions and are not thread-safe and should not be used in multithread programs. - can be replaced by . + can be replaced by . can be replaced by . diff --git a/src/fe_utils/connect_utils.c b/src/fe_utils/connect_utils.c index f2e583f9fa..b9f0c0558c 100644 --- a/src/fe_utils/connect_utils.c +++ b/src/fe_utils/connect_utils.c @@ -158,19 +158,11 @@ connectMaintenanceDatabase(ConnParams *cparams, void disconnectDatabase(PGconn *conn) { - char errbuf[256]; - Assert(conn != NULL); if (PQtransactionStatus(conn) == PQTRANS_ACTIVE) { - PGcancel *cancel; - - if ((cancel = PQgetCancel(conn))) - { - (void) PQcancel(cancel, errbuf, sizeof(errbuf)); - PQfreeCancel(cancel); - } + (void) PQrequestCancel(conn); } PQfinish(conn); diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index e8bcc88370..f7609d0c64 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -186,3 +186,5 @@ PQpipelineStatus 183 PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 +PQrequestCancelStart 187 +PQconnectComplete 188 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 6e936bbff3..7390fbec7c 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -379,6 +379,7 @@ static PGPing internal_ping(PGconn *conn); static PGconn *makeEmptyPGconn(void); static void pqFreeCommandQueue(PGcmdQueueEntry *queue); static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions); +static bool copyPGconn(PGconn *srcConn, PGconn *dstConn); static void freePGconn(PGconn *conn); static void closePGconn(PGconn *conn); static void release_conn_addrinfo(PGconn *conn); @@ -605,8 +606,17 @@ pqDropServerData(PGconn *conn) if (conn->write_err_msg) free(conn->write_err_msg); conn->write_err_msg = NULL; - conn->be_pid = 0; - conn->be_key = 0; + + /* + * Cancel connections should save their be_pid and be_key across + * PQresetStart invocations. Otherwise they don't know the secret token of + * the connection they are supposed to cancel anymore. + */ + if (!conn->cancelRequest) + { + conn->be_pid = 0; + conn->be_key = 0; + } } @@ -737,6 +747,68 @@ PQping(const char *conninfo) return ret; } +/* + * PQcancelConnectStart + * + * Asynchronously cancel a request on the given connection. This requires + * polling the returned PGconn to actually complete the cancellation of the + * request. + */ +PGconn * +PQrequestCancelStart(PGconn *conn) +{ + PGconn *cancelConn = makeEmptyPGconn(); + + if (cancelConn == NULL) + return NULL; + + /* Check we have an open connection */ + if (!conn) + { + appendPQExpBufferStr(&cancelConn->errorMessage, libpq_gettext("passed connection was NULL\n")); + return cancelConn; + } + + if (conn->sock == PGINVALID_SOCKET) + { + appendPQExpBufferStr(&cancelConn->errorMessage, libpq_gettext("passed connection is not open\n")); + return cancelConn; + } + + /* + * Indicate that this connection is used to send a cancellation + */ + cancelConn->cancelRequest = true; + + if (!copyPGconn(conn, cancelConn)) + return (PGconn *) cancelConn; + + /* + * Compute derived options + */ + if (!connectOptions2(cancelConn)) + return cancelConn; + + /* + * Copy cancelation token data from the original connnection + */ + cancelConn->be_pid = conn->be_pid; + cancelConn->be_key = conn->be_key; + + /* + * Cancel requests should not iterate over all possible hosts. The request + * needs to be sent to the exact host and address that the original + * connection used. + */ + memcpy(&cancelConn->raddr, &conn->raddr, sizeof(SockAddr)); + cancelConn->whichhost = conn->whichhost; + conn->try_next_host = false; + conn->try_next_addr = false; + + cancelConn->status = CONNECTION_STARTING; + return cancelConn; +} + /* * PQconnectStartParams * @@ -914,6 +986,46 @@ fillPGconn(PGconn *conn, PQconninfoOption *connOptions) return true; } +/* + * Copy over option values from srcConn to dstConn + * + * Don't put anything cute here --- intelligence should be in + * connectOptions2 ... + * + * Returns true on success. On failure, returns false and sets error message of + * dstConn. + */ +static bool +copyPGconn(PGconn *srcConn, PGconn *dstConn) +{ + const internalPQconninfoOption *option; + + /* copy over connection options */ + for (option = PQconninfoOptions; option->keyword; option++) + { + if (option->connofs >= 0) + { + const char **tmp = (const char **) ((char *) srcConn + option->connofs); + + if (*tmp) + { + char **dstConnmember = (char **) ((char *) dstConn + option->connofs); + + if (*dstConnmember) + free(*dstConnmember); + *dstConnmember = strdup(*tmp); + if (*dstConnmember == NULL) + { + appendPQExpBufferStr(&dstConn->errorMessage, + libpq_gettext("out of memory\n")); + return false; + } + } + } + } + return true; +} + /* * connectOptions1 * @@ -2082,10 +2194,17 @@ connectDBStart(PGconn *conn) * Set up to try to connect to the first host. (Setting whichhost = -1 is * a bit of a cheat, but PQconnectPoll will advance it to 0 before * anything else looks at it.) + * + * Cancel requests are special though, they should only try one host, + * which is determined in PQcancelConnectStart. So leave these settings + * alone for cancel requests. */ - conn->whichhost = -1; - conn->try_next_addr = false; - conn->try_next_host = true; + if (!conn->cancelRequest) + { + conn->whichhost = -1; + conn->try_next_host = true; + conn->try_next_addr = false; + } conn->status = CONNECTION_NEEDED; /* Also reset the target_server_type state if needed */ @@ -2134,6 +2253,15 @@ connectDBComplete(PGconn *conn) if (conn == NULL || conn->status == CONNECTION_BAD) return 0; + if (conn->status == CONNECTION_STARTING) + { + if (!connectDBStart(conn)) + { + conn->status = CONNECTION_BAD; + return 0; + } + } + /* * Set up a time limit, if connect_timeout isn't zero. */ @@ -2274,13 +2402,15 @@ PQconnectPoll(PGconn *conn) switch (conn->status) { /* - * We really shouldn't have been polled in these two cases, but we - * can handle it. + * We really shouldn't have been polled in these three cases, but + * we can handle it. */ case CONNECTION_BAD: return PGRES_POLLING_FAILED; case CONNECTION_OK: return PGRES_POLLING_OK; + case CONNECTION_CANCEL_FINISHED: + return PGRES_POLLING_OK; /* These are reading states */ case CONNECTION_AWAITING_RESPONSE: @@ -2292,6 +2422,34 @@ PQconnectPoll(PGconn *conn) /* Load waiting data */ int n = pqReadData(conn); +#ifndef WIN32 + if (n == -2 && conn->cancelRequest) +#else + + /* + * Windows is a bit special in its EOF behaviour for TCP. + * Sometimes it will error with an ECONNRESET when there is a + * clean connection closure. See these threads for details: + * https://www.postgresql.org/message-id/flat/90b34057-4176-7bb0-0dbb-9822a5f6425b%40greiz-reinsdorf.de + * + * https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BOeoETZQ%3DQw5Ub5h3tmwQhBmDA%3DnuNO3KG%3DzWfUypFAw%40mail.gmail.com + * + * PQcancel ignores such errors and reports success for the + * cancellation anyway, so even if this is not always correct + * we do the same here. + */ + if (n < 0 && conn->cancelRequest) +#endif + { + /* + * This is the expected end state for cancel connections. + * They are closed once the cancel is processed by the + * server. + */ + conn->status = CONNECTION_CANCEL_FINISHED; + resetPQExpBuffer(&conn->errorMessage); + return PGRES_POLLING_OK; + } if (n < 0) goto error_return; if (n == 0) @@ -2301,6 +2459,7 @@ PQconnectPoll(PGconn *conn) } /* These are writing states, so we just proceed. */ + case CONNECTION_STARTING: case CONNECTION_STARTED: case CONNECTION_MADE: break; @@ -2325,6 +2484,14 @@ keep_going: /* We will come back to here until there is /* Time to advance to next address, or next host if no more addresses? */ if (conn->try_next_addr) { + /* + * Cancel requests never have more addresses to try. They should only + * try a single one. + */ + if (conn->cancelRequest) + { + goto error_return; + } if (conn->addr_cur && conn->addr_cur->ai_next) { conn->addr_cur = conn->addr_cur->ai_next; @@ -2344,6 +2511,15 @@ keep_going: /* We will come back to here until there is int ret; char portstr[MAXPGPATH]; + /* + * Cancel requests never have more hosts to try. They should only try + * a single one. + */ + if (conn->cancelRequest) + { + goto error_return; + } + if (conn->whichhost + 1 < conn->nconnhost) conn->whichhost++; else @@ -2529,19 +2705,27 @@ keep_going: /* We will come back to here until there is char host_addr[NI_MAXHOST]; /* - * Advance to next possible host, if we've tried all of - * the addresses for the current host. + * Cancel requests don't use addr_cur at all. They have + * their raddr field already filled in during + * initialization in PQcancelConnectStart. */ - if (addr_cur == NULL) + if (!conn->cancelRequest) { - conn->try_next_host = true; - goto keep_going; - } + /* + * Advance to next possible host, if we've tried all + * of the addresses for the current host. + */ + if (addr_cur == NULL) + { + conn->try_next_host = true; + goto keep_going; + } - /* Remember current address for possible use later */ - memcpy(&conn->raddr.addr, addr_cur->ai_addr, - addr_cur->ai_addrlen); - conn->raddr.salen = addr_cur->ai_addrlen; + /* Remember current address for possible use later */ + memcpy(&conn->raddr.addr, addr_cur->ai_addr, + addr_cur->ai_addrlen); + conn->raddr.salen = addr_cur->ai_addrlen; + } /* * Set connip, too. Note we purposely ignore strdup @@ -2557,7 +2741,7 @@ keep_going: /* We will come back to here until there is conn->connip = strdup(host_addr); /* Try to create the socket */ - conn->sock = socket(addr_cur->ai_family, SOCK_STREAM, 0); + conn->sock = socket(conn->raddr.addr.ss_family, SOCK_STREAM, 0); if (conn->sock == PGINVALID_SOCKET) { int errorno = SOCK_ERRNO; @@ -2567,12 +2751,18 @@ keep_going: /* We will come back to here until there is * addresses to try; this reduces useless chatter in * cases where the address list includes both IPv4 and * IPv6 but kernel only accepts one family. + * + * Cancel requests never have more addresses to try. + * They should only try a single one. */ - if (addr_cur->ai_next != NULL || - conn->whichhost + 1 < conn->nconnhost) + if (!conn->cancelRequest) { - conn->try_next_addr = true; - goto keep_going; + if (addr_cur->ai_next != NULL || + conn->whichhost + 1 < conn->nconnhost) + { + conn->try_next_addr = true; + goto keep_going; + } } emitHostIdentityInfo(conn, host_addr); appendPQExpBuffer(&conn->errorMessage, @@ -2595,7 +2785,7 @@ keep_going: /* We will come back to here until there is * TCP sockets, nonblock mode, close-on-exec. Try the * next address if any of this fails. */ - if (addr_cur->ai_family != AF_UNIX) + if (conn->raddr.addr.ss_family != AF_UNIX) { if (!connectNoDelay(conn)) { @@ -2624,7 +2814,7 @@ keep_going: /* We will come back to here until there is } #endif /* F_SETFD */ - if (addr_cur->ai_family != AF_UNIX) + if (conn->raddr.addr.ss_family != AF_UNIX) { #ifndef WIN32 int on = 1; @@ -2718,8 +2908,9 @@ keep_going: /* We will come back to here until there is * Start/make connection. This should not block, since we * are in nonblock mode. If it does, well, too bad. */ - if (connect(conn->sock, addr_cur->ai_addr, - addr_cur->ai_addrlen) < 0) + if (connect(conn->sock, + (struct sockaddr *) &conn->raddr.addr, + conn->raddr.salen) < 0) { if (SOCK_ERRNO == EINPROGRESS || #ifdef WIN32 @@ -2758,6 +2949,16 @@ keep_going: /* We will come back to here until there is } } + case CONNECTION_STARTING: + { + if (!connectDBStart(conn)) + { + goto error_return; + } + conn->status = CONNECTION_STARTED; + return PGRES_POLLING_WRITING; + } + case CONNECTION_STARTED: { socklen_t optlen = sizeof(optval); @@ -2966,6 +3167,25 @@ keep_going: /* We will come back to here until there is } #endif /* USE_SSL */ + if (conn->cancelRequest) + { + CancelRequestPacket cancelpacket; + + packetlen = sizeof(cancelpacket); + cancelpacket.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE); + cancelpacket.backendPID = pg_hton32(conn->be_pid); + cancelpacket.cancelAuthCode = pg_hton32(conn->be_key); + if (pqPacketSend(conn, 0, &cancelpacket, packetlen) != STATUS_OK) + { + appendPQExpBuffer(&conn->errorMessage, + libpq_gettext("could not send cancel packet: %s\n"), + SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); + goto error_return; + } + conn->status = CONNECTION_AWAITING_RESPONSE; + return PGRES_POLLING_READING; + } + /* * Build the startup packet. */ @@ -4194,6 +4414,11 @@ release_conn_addrinfo(PGconn *conn) static void sendTerminateConn(PGconn *conn) { + if (conn->cancelRequest) + { + return; + } + /* * Note that the protocol doesn't allow us to send Terminate messages * during the startup phase. @@ -4311,6 +4536,12 @@ PQresetStart(PGconn *conn) { closePGconn(conn); + if (conn->cancelRequest) + { + conn->status = CONNECTION_STARTING; + return 1; + } + return connectDBStart(conn); } @@ -4663,6 +4894,22 @@ cancel_errReturn: return false; } +/* + * PQconnectComplete: takes a non blocking cancel connection and completes it + * in a blocking manner. + * + * Returns 1 if able to connect successfully and 0 if not. + * + * This can useful if you only care about the thread safety of + * PQrequestCancelStart and not about its non blocking functionality. + */ +int +PQconnectComplete(PGconn *cancelConn) +{ + connectDBComplete(cancelConn); + return cancelConn->status != CONNECTION_BAD; +} + /* * PQrequestCancel: old, not thread-safe function for requesting query cancel @@ -4679,45 +4926,31 @@ cancel_errReturn: int PQrequestCancel(PGconn *conn) { - int r; - PGcancel *cancel; - - /* Check we have an open connection */ - if (!conn) - return false; + PGconn *cancelConn = NULL; - if (conn->sock == PGINVALID_SOCKET) + cancelConn = PQrequestCancelStart(conn); + if (!cancelConn) { - strlcpy(conn->errorMessage.data, - "PQrequestCancel() -- connection is not open\n", - conn->errorMessage.maxlen); - conn->errorMessage.len = strlen(conn->errorMessage.data); - conn->errorReported = 0; - + appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("out of memory\n")); return false; } - cancel = PQgetCancel(conn); - if (cancel) - { - r = PQcancel(cancel, conn->errorMessage.data, - conn->errorMessage.maxlen); - PQfreeCancel(cancel); - } - else + if (cancelConn->status == CONNECTION_BAD) { - strlcpy(conn->errorMessage.data, "out of memory", - conn->errorMessage.maxlen); - r = false; + appendPQExpBufferStr(&conn->errorMessage, PQerrorMessage(cancelConn)); + freePGconn(cancelConn); + return false; } - if (!r) + if (!PQconnectComplete(cancelConn)) { - conn->errorMessage.len = strlen(conn->errorMessage.data); - conn->errorReported = 0; + appendPQExpBufferStr(&conn->errorMessage, PQerrorMessage(cancelConn)); + freePGconn(cancelConn); + return false; } - return r; + freePGconn(cancelConn); + return true; } diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index d76bb3957a..a944cb2c12 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -558,8 +558,11 @@ pqPutMsgEnd(PGconn *conn) * Possible return values: * 1: successfully loaded at least one more byte * 0: no data is presently available, but no error detected - * -1: error detected (including EOF = connection closure); + * -1: error detected (excluding EOF = connection closure); * conn->errorMessage set + * -2: EOF detected, connection is closed + * conn->errorMessage set + * * NOTE: callers must not assume that pointers or indexes into conn->inBuffer * remain valid across this call! * ---------- @@ -642,7 +645,7 @@ retry3: default: /* pqsecure_read set the error message for us */ - return -1; + return nread; } } if (nread > 0) @@ -737,7 +740,7 @@ retry4: default: /* pqsecure_read set the error message for us */ - return -1; + return nread; } } if (nread > 0) @@ -755,13 +758,17 @@ definitelyEOF: libpq_gettext("server closed the connection unexpectedly\n" "\tThis probably means the server terminated abnormally\n" "\tbefore or while processing the request.\n")); + /* Do *not* drop any already-read data; caller still wants it */ + pqDropConnection(conn, false); + conn->status = CONNECTION_BAD; /* No more connection to backend */ + return -2; /* Come here if lower-level code already set a suitable errorMessage */ definitelyFailed: /* Do *not* drop any already-read data; caller still wants it */ pqDropConnection(conn, false); conn->status = CONNECTION_BAD; /* No more connection to backend */ - return -1; + return nread < 0 ? nread : -1; } /* diff --git a/src/interfaces/libpq/fe-secure-openssl.c b/src/interfaces/libpq/fe-secure-openssl.c index 8117cbd40f..c553a74898 100644 --- a/src/interfaces/libpq/fe-secure-openssl.c +++ b/src/interfaces/libpq/fe-secure-openssl.c @@ -255,7 +255,7 @@ rloop: appendPQExpBufferStr(&conn->errorMessage, libpq_gettext("SSL connection has been closed unexpectedly\n")); result_errno = ECONNRESET; - n = -1; + n = -2; break; default: appendPQExpBuffer(&conn->errorMessage, diff --git a/src/interfaces/libpq/fe-secure.c b/src/interfaces/libpq/fe-secure.c index a1dc7b796d..9771805dd3 100644 --- a/src/interfaces/libpq/fe-secure.c +++ b/src/interfaces/libpq/fe-secure.c @@ -201,6 +201,12 @@ pqsecure_close(PGconn *conn) * On failure, this function is responsible for appending a suitable message * to conn->errorMessage. The caller must still inspect errno, but only * to determine whether to continue/retry after error. + * + * Returns -1 in case of failures, except in the case of where a failure means + * that there was a clean connection closure, in those cases -2 is return. + * Currently only the TLS implementation of pqsecure_read ever returns -2. For + * the other implementations a clean connection closure is detected in + * pqReadData instead. */ ssize_t pqsecure_read(PGconn *conn, void *ptr, size_t len) diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index 7986445f1a..24695a6026 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -59,12 +59,15 @@ typedef enum { CONNECTION_OK, CONNECTION_BAD, + CONNECTION_CANCEL_FINISHED, /* Non-blocking mode only below here */ /* * The existence of these should never be relied upon - they should only * be used for user feedback or similar purposes. */ + CONNECTION_STARTING, /* Waiting for connection attempt to be + * started. */ CONNECTION_STARTED, /* Waiting for connection to be made. */ CONNECTION_MADE, /* Connection OK; waiting to send. */ CONNECTION_AWAITING_RESPONSE, /* Waiting for a response from the @@ -282,6 +285,7 @@ extern PGconn *PQconnectStart(const char *conninfo); extern PGconn *PQconnectStartParams(const char *const *keywords, const char *const *values, int expand_dbname); extern PostgresPollingStatusType PQconnectPoll(PGconn *conn); +extern int PQconnectComplete(PGconn *conn); /* Synchronous (blocking) */ extern PGconn *PQconnectdb(const char *conninfo); @@ -330,9 +334,12 @@ extern void PQfreeCancel(PGcancel *cancel); /* issue a cancel request */ extern int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); -/* backwards compatible version of PQcancel; not thread-safe */ +/* more secure version of PQcancel; not thread-safe */ extern int PQrequestCancel(PGconn *conn); +/* non-blocking and thread-safe version of PQrequestCancel */ +extern PGconn *PQrequestCancelStart(PGconn *conn); + /* Accessor functions for PGconn objects */ extern char *PQdb(const PGconn *conn); extern char *PQuser(const PGconn *conn); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 3db6a17db4..b9ce1d58c1 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -394,6 +394,10 @@ struct pg_conn char *ssl_max_protocol_version; /* maximum TLS protocol version */ char *target_session_attrs; /* desired session properties */ + bool cancelRequest; /* true if this connection is used to send a + * cancel request, instead of being a normal + * connection that's used for queries */ + /* Optional file to write trace info to */ FILE *Pfdebug; int traceFlags; diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c index 12179f2514..b073235197 100644 --- a/src/test/isolation/isolationtester.c +++ b/src/test/isolation/isolationtester.c @@ -948,26 +948,18 @@ try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags) */ if (td > max_step_wait && !canceled) { - PGcancel *cancel = PQgetCancel(conn); - - if (cancel != NULL) + if (PQrequestCancel(conn)) { - char buf[256]; - - if (PQcancel(cancel, buf, sizeof(buf))) - { - /* - * print to stdout not stderr, as this should appear - * in the test case's results - */ - printf("isolationtester: canceling step %s after %d seconds\n", - step->name, (int) (td / USECS_PER_SEC)); - canceled = true; - } - else - fprintf(stderr, "PQcancel failed: %s\n", buf); - PQfreeCancel(cancel); + /* + * print to stdout not stderr, as this should appear in + * the test case's results + */ + printf("isolationtester: canceling step %s after %d seconds\n", + step->name, (int) (td / USECS_PER_SEC)); + canceled = true; } + else + fprintf(stderr, "PQcancel failed: %s\n", PQerrorMessage(conn)); } /* diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 0ff563f59a..52503907bd 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -86,6 +86,275 @@ pg_fatal_impl(int line, const char *fmt,...) exit(1); } +/* + * Check that the query on the given connection got cancelled. + * + * This is a function wrapped in a macrco to make the reported line number + * in an error match the line number of the invocation. + */ +#define confirm_query_cancelled(conn) confirm_query_cancelled_impl(__LINE__, conn) +static void +confirm_query_cancelled_impl(int line, PGconn *conn) +{ + PGresult *res = NULL; + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal_impl(line, "PQgetResult returned null: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal_impl(line, "query did not fail when it was expected"); + if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0) + pg_fatal_impl(line, "query failed with a different error than cancellation: %s", + PQerrorMessage(conn)); + PQclear(res); + while (PQisBusy(conn)) + { + PQconsumeInput(conn); + } +} + +#define send_cancellable_query(conn, monitorConn) send_cancellable_query_impl(__LINE__, conn, monitorConn) +static void +send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) +{ + if (PQsendQuery(conn, "SELECT pg_sleep(30)") != 1) + pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn)); + + /* + * Wait until the query is actually running. Otherwise sending a + * cancellation request might not cancel the query due to race conditions. + */ + while (true) + { + char *value = NULL; + PGresult *res = PQexec( + monitorConn, + "SELECT count(*) FROM pg_stat_activity WHERE " + "query = 'SELECT pg_sleep(30)' " + "AND state = 'active'"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_fatal("Connection to database failed: %s", PQerrorMessage(monitorConn)); + } + if (PQntuples(res) != 1) + { + pg_fatal("unexpected number of rows received: %d", PQntuples(res)); + } + if (PQnfields(res) != 1) + { + pg_fatal("unexpected number of columns received: %d", PQnfields(res)); + } + value = PQgetvalue(res, 0, 0); + if (*value != '0') + { + PQclear(res); + break; + } + PQclear(res); + + /* + * wait 10ms before polling again + */ + pg_usleep(10000); + } +} + +static void +test_cancel(PGconn *conn, const char *conninfo) +{ + PGcancel *cancel = NULL; + PGconn *cancelConn = NULL; + PGconn *monitorConn = NULL; + char errorbuf[256]; + + fprintf(stderr, "test cancellations... "); + + if (PQsetnonblocking(conn, 1) != 0) + pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn)); + + /* + * Make a connection to the database to monitor the query on the main + * connection. + */ + monitorConn = PQconnectdb(conninfo); + if (PQstatus(conn) != CONNECTION_OK) + { + pg_fatal("Connection to database failed: %s", + PQerrorMessage(conn)); + } + + /* test PQcancel */ + send_cancellable_query(conn, monitorConn); + cancel = PQgetCancel(conn); + if (!PQcancel(cancel, errorbuf, sizeof(errorbuf))) + { + pg_fatal("failed to run PQcancel: %s", errorbuf); + }; + confirm_query_cancelled(conn); + + /* PGcancel object can be reused for the next query */ + send_cancellable_query(conn, monitorConn); + if (!PQcancel(cancel, errorbuf, sizeof(errorbuf))) + { + pg_fatal("failed to run PQcancel: %s", errorbuf); + }; + confirm_query_cancelled(conn); + + PQfreeCancel(cancel); + + /* test PQrequestCancel */ + send_cancellable_query(conn, monitorConn); + if (!PQrequestCancel(conn)) + pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn)); + confirm_query_cancelled(conn); + + /* test PQrequestCancelStart and then polling with PQcancelConnectPoll */ + send_cancellable_query(conn, monitorConn); + cancelConn = PQrequestCancelStart(conn); + if (PQstatus(cancelConn) == CONNECTION_BAD) + pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn)); + while (true) + { + struct timeval tv; + fd_set input_mask; + fd_set output_mask; + PostgresPollingStatusType pollres = PQconnectPoll(cancelConn); + int sock = PQsocket(cancelConn); + + if (pollres == PGRES_POLLING_OK) + { + break; + } + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + switch (pollres) + { + case PGRES_POLLING_READING: + pg_debug("polling for reads\n"); + FD_SET(sock, &input_mask); + break; + case PGRES_POLLING_WRITING: + pg_debug("polling for writes\n"); + FD_SET(sock, &output_mask); + break; + default: + pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn)); + } + + if (sock < 0) + pg_fatal("sock did not exist: %s", PQerrorMessage(cancelConn)); + + tv.tv_sec = 3; + tv.tv_usec = 0; + + while (true) + { + if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0) + { + if (errno == EINTR) + continue; + pg_fatal("select() failed: %m"); + } + break; + } + } + if (PQstatus(cancelConn) != CONNECTION_CANCEL_FINISHED) + pg_fatal("unexpected cancel connection status: %s", PQerrorMessage(cancelConn)); + confirm_query_cancelled(conn); + + /* + * test PQresetStart works on the cancel connection and it can be reused + * after + */ + if (!PQresetStart(cancelConn)) + { + pg_fatal("cancel connection reset failed: %s", PQerrorMessage(cancelConn)); + } + + send_cancellable_query(conn, monitorConn); + if (PQstatus(cancelConn) == CONNECTION_BAD) + pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn)); + while (true) + { + struct timeval tv; + fd_set input_mask; + fd_set output_mask; + PostgresPollingStatusType pollres = PQresetPoll(cancelConn); + int sock = PQsocket(cancelConn); + + if (pollres == PGRES_POLLING_OK) + { + break; + } + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + switch (pollres) + { + case PGRES_POLLING_READING: + pg_debug("polling for reads\n"); + FD_SET(sock, &input_mask); + break; + case PGRES_POLLING_WRITING: + pg_debug("polling for writes\n"); + FD_SET(sock, &output_mask); + break; + default: + pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn)); + } + + if (sock < 0) + pg_fatal("sock did not exist: %s", PQerrorMessage(cancelConn)); + + tv.tv_sec = 3; + tv.tv_usec = 0; + + while (true) + { + if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0) + { + if (errno == EINTR) + continue; + pg_fatal("select() failed: %m"); + } + break; + } + } + if (PQstatus(cancelConn) != CONNECTION_CANCEL_FINISHED) + pg_fatal("unexpected cancel connection status: %s", PQerrorMessage(cancelConn)); + confirm_query_cancelled(conn); + + PQfinish(cancelConn); + + /* test PQconnectComplete */ + send_cancellable_query(conn, monitorConn); + cancelConn = PQrequestCancelStart(conn); + if (PQstatus(cancelConn) == CONNECTION_BAD) + pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn)); + if (!PQconnectComplete(cancelConn)) + pg_fatal("failed to send cancel: %s", PQerrorMessage(cancelConn)); + confirm_query_cancelled(conn); + + /* test PQconnectComplete with reset connection */ + if (!PQresetStart(cancelConn)) + { + pg_fatal("cancel connection reset failed: %s", PQerrorMessage(cancelConn)); + } + + send_cancellable_query(conn, monitorConn); + if (PQstatus(cancelConn) == CONNECTION_BAD) + pg_fatal("bad cancel connection: %s", PQerrorMessage(cancelConn)); + if (!PQconnectComplete(cancelConn)) + pg_fatal("failed to send cancel: %s", PQerrorMessage(cancelConn)); + confirm_query_cancelled(conn); + PQfinish(cancelConn); + + fprintf(stderr, "ok\n"); +} + static void test_disallowed_in_pipeline(PGconn *conn) { @@ -1545,6 +1814,7 @@ usage(const char *progname) static void print_test_list(void) { + printf("cancel\n"); printf("disallowed_in_pipeline\n"); printf("multi_pipelines\n"); printf("nosync\n"); @@ -1642,7 +1912,9 @@ main(int argc, char **argv) PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE); } - if (strcmp(testname, "disallowed_in_pipeline") == 0) + if (strcmp(testname, "cancel") == 0) + test_cancel(conn, conninfo); + else if (strcmp(testname, "disallowed_in_pipeline") == 0) test_disallowed_in_pipeline(conn); else if (strcmp(testname, "multi_pipelines") == 0) test_multi_pipelines(conn); -- 2.34.1