From 22e45642aeade25c0983e260a5ab93124e2da082 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 12 Jan 2022 09:52:05 +0100 Subject: [PATCH v11 4/5] Add non-blocking version of PQcancel This patch makes the following changes in libpq: 1. Add a new PQcancelSend function, which sends cancellation requests using the regular connection establishment code. This makes sure that cancel requests support and use all connection options including encryption. 2. Add a new PQcancelConn function which allows sending cancellation in a non-blocking way by using it together with the newly added PQcancelPoll and PQcancelSocket. 3. Use these two new cancellation APIs everywhere in the codebase where signal-safety is not a necessity. The existing PQcancel API is using blocking IO. This makes PQcancel impossible to use in an event loop based codebase, without blocking the event loop until the call returns. PQcancelConn can now be used instead, to have a non-blocking way of sending cancel requests. The postgres_fdw cancellation code has been modified to make use of this. This patch also includes a test for all of libpq cancellation APIs. The test can be easily run like this: cd src/test/modules/libpq_pipeline make && ./libpq_pipeline cancel --- doc/src/sgml/libpq.sgml | 275 ++++++++++- src/interfaces/libpq/exports.txt | 8 + src/interfaces/libpq/fe-connect.c | 452 +++++++++++++++++- src/interfaces/libpq/libpq-fe.h | 25 +- src/interfaces/libpq/libpq-int.h | 9 + .../modules/libpq_pipeline/libpq_pipeline.c | 265 +++++++++- 6 files changed, 982 insertions(+), 52 deletions(-) diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 0e7ae70c70..53b64865bb 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -265,7 +265,7 @@ PGconn *PQsetdb(char *pghost, PQconnectStartParamsPQconnectStartParams PQconnectStartPQconnectStart - PQconnectPollPQconnectPoll + PQconnectPollPQconnectPoll nonblocking connection @@ -4909,7 +4909,7 @@ int PQisBusy(PGconn *conn); / can also attempt to cancel a command that is still being processed by the server; see . But regardless of - the return value of , the application + the return value of , the application must continue with the normal result-reading sequence using . A successful cancellation will simply cause the command to terminate sooner than it would have @@ -5628,13 +5628,218 @@ int PQsetSingleRowMode(PGconn *conn); this section. + + PQcancelSendPQcancelSend + + + + Requests that the server abandons processing of the current command. + +PGcancelConn *PQcancelSend(PGconn *conn); + + + + + This request is made over a connection that uses the same connection + options as the the original PGconn. So when the + original connection is encrypted (using TLS or GSS), the connection for + the cancel request connection is encrypted in the same. Any connection + options that only make sense for authentication or after authentication + are ignored though, because cancellation requests do not require + authentication. + + + + This function returns a PGcancelConn + object. By using + it can be checked if there was any error when sending the cancellation + request. If + returns for CONNECTION_OK the request was + successfully sent, but if it returns CONNECTION_BAD + an error occured. If an error occured the error message can be retrieved using + . + + + + Successful dispatch of the cancellation is no guarantee that the request + will have any effect, however. If the cancellation is effective, the + command being cancelled will terminate early and return an error result. + If the cancellation fails (say, because the server was already done + processing the command), then there will be no visible result at all. + + + + Note that when PQcancelSend returns a non-null + pointer, you must call when you + are finished with it, in order to dispose of the structure and any + associated memory blocks. This must be done even if the cancel request + failed. + + + + + + PQcancelConnPQcancelConn + + + + A version of that can be used + in a non-blocking manner. + +PGcancelConn *PQcancelConn(PGconn *conn); + + + + + creates a + PGcancelConnPGcancelConn, + but it won't instantly start sending a cancel request over this + connection like . + should be called on the return + value to check if the PGcancelConn was + created successfully. The PGcancelConn object + is an opaque structure that is not meant to be accessed directly by the + application. This PGcancelConn object can be + used to cancel the query that's running on the original connection in a + thread-safe and non-blocking way. + + + + Note that when PQcancelConn returns a non-null + pointer, you must call when you + are finished with it, in order to dispose of the structure and any + associated memory blocks. This must be done even if the cancel request + failed or was abandoned. + + + + + + PQcancelStatusPQcancelStatus + + + + A version of that can be used for + cancellation connections. + +ConnStatusType PQcancelStatus(const PGcancelConn *conn); + + + + In addition to all the statuses that a PGconn + can have, this connection can have one additional status: + + + + CONNECTION_STARTING + + + Waiting for the first call to , + to actually open the socket. This is the connection state right after + calling . No connection to the + server has been initiated yet at this point. To actually start + sending the cancel request use . + + + + + + + + One final note about the returned statuses is that + CONNECTION_OK has a slightly different meaning for a + PGcancelConn than what it has for a + PGconn. When + returns CONNECTION_OK for a PGcancelConn + it means that that the dispatch of the cancel request has completed (although + this is no promise that the query was actually cancelled) and that the + connection is now closed. While a CONNECTION_OK result + for PGconn means that queries can be sent over + the connection. + + + + + + + PQcancelPollPQcancelPoll + + + + A version of that can be used for + cancellation connections. + +PostgresPollingStatusType PQcancelPoll(PGcancelConn *conn); + + + + + + + PQcancelErrorMessagePQcancelErrorMessage + + + + A version of that can be used for + cancellation connections. + +char *PQcancelErrorMessage(const PGcancelConn *conn); + + + + + + + PQcancelFinishPQcancelFinish + + + Closes the cancel connection (if it did not finish sending the cancel + request yet). Also frees memory used by the PGcancelConn + object. + +void PQcancelFinish(PGcancelConn *conn); + + + + + Note that even if the cancel attempt fails (as + indicated by ), the application should call + to free the memory used by the PGcancelConn object. + The PGcancelConn pointer must not be used again after + has been called. + + + + + + PQcancelResetPQcancelReset + + + Resets the PGcancelConn so it can be reused for a new + cancel connection. + +void PQcancelReset(PGcancelConn *conn); + + + + + If the PGcancelConn is currently used to send a cancel + request, then this connection is closed. It will then prepare the + PGcancelConn object such that it can be used to send a + new cancel request. This can be used to create one PGcancelConn + for a PGconn and reuse that multiple times throughout + the lifetime of the original PGconn. + + + + PQgetCancelPQgetCancel Creates a data structure containing the information needed to cancel - a command issued through a particular database connection. + a command using . PGcancel *PQgetCancel(PGconn *conn); @@ -5676,14 +5881,28 @@ void PQfreeCancel(PGcancel *cancel); - Requests that the server abandon processing of the current command. + An insecure version of , but one + that can be used safely from within a signal handler. int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); - The return value is 1 if the cancel request was successfully + should only be used if it's necessary + to cancel a query from a signal-handler. If signal-safety is not needed, + should be used to cancel the query + instead. can be safely invoked from a + signal handler, if the errbuf is a local variable + in the signal handler. The PGcancel object is + read-only as far as is concerned, so it + can also be invoked from a thread that is separate from the one + manipulating the PGconn object. + + + + The return value of + is 1 if the cancel request was successfully dispatched and 0 if not. If not, errbuf is filled with an explanatory error message. errbuf must be a char array of size errbufsize (the @@ -5691,21 +5910,22 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); - Successful dispatch is no guarantee that the request will have - any effect, however. If the cancellation is effective, the current - command will terminate early and return an error result. If the - cancellation fails (say, because the server was already done - processing the command), then there will be no visible result at - all. - - - - can safely be invoked from a signal - handler, if the errbuf is a local variable in the - signal handler. The PGcancel object is read-only - as far as is concerned, so it can - also be invoked from a thread that is separate from the one - manipulating the PGconn object. + To achieve signal-safety, some concessions needed to be made in the + implementation of . Not all connection + options of the original connection are used when establishing a + connection for the cancellation request. This function connects to + postgres on the same address and port as tha original connection. The + only connection options that are honored during this connection are + keepalives, + keepalives_idle, + keepalives_interval, + keepalives_count, and + tcp_user_timeout. + So, for example + connect_timeout, + gssencmode, and + sslmode are ignored. This means the connection + is never encrypted using TLS or GSS. @@ -5717,13 +5937,22 @@ int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); - is a deprecated variant of - . + is a deprecated and insecure + variant of . int PQrequestCancel(PGconn *conn); + + only exists because of backwards + compatibility reasons. should be + used instead, to avoid the security and thread-safety issues that this + function has. This function has the same security issues as + , but without the benefit of being + signal-safe. + + Requests that the server abandon processing of the current command. It operates directly on the @@ -8872,7 +9101,7 @@ int PQisthreadsafe(); The deprecated functions and are not thread-safe and should not be used in multithread programs. - can be replaced by . + can be replaced by . can be replaced by . diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index e8bcc88370..f56e8c185c 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -186,3 +186,11 @@ PQpipelineStatus 183 PQsetTraceFlags 184 PQmblenBounded 185 PQsendFlushRequest 186 +PQcancelSend 187 +PQcancelConn 188 +PQcancelPoll 189 +PQcancelStatus 190 +PQcancelSocket 191 +PQcancelErrorMessage 192 +PQcancelReset 193 +PQcancelFinish 194 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 46afe127f1..cef341adcd 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -376,8 +376,10 @@ static PGPing internal_ping(PGconn *conn); static PGconn *makeEmptyPGconn(void); static void pqFreeCommandQueue(PGcmdQueueEntry *queue); static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions); +static bool copyPGconn(PGconn *srcConn, PGconn *dstConn); static void freePGconn(PGconn *conn); static void closePGconn(PGconn *conn); +static void release_conn_hosts(PGconn *conn); static void release_conn_addrinfo(PGconn *conn); static bool store_conn_addrinfo(PGconn *conn, struct addrinfo *addrlist); static void sendTerminateConn(PGconn *conn); @@ -600,8 +602,17 @@ pqDropServerData(PGconn *conn) conn->write_failed = false; free(conn->write_err_msg); conn->write_err_msg = NULL; - conn->be_pid = 0; - conn->be_key = 0; + + /* + * Cancel connections should save their be_pid and be_key across + * PQcancelReset invocations. Otherwise they would not have access to the + * secret token of the connection they are supposed to cancel anymore. + */ + if (!conn->cancelRequest) + { + conn->be_pid = 0; + conn->be_key = 0; + } } @@ -732,6 +743,113 @@ PQping(const char *conninfo) return ret; } +/* + * PQcancelConn + * + * Asynchronously cancel a query on the given connection. This requires polling + * the returned PGcancelConn to actually complete the cancellation of the + * query. + */ +PGcancelConn * +PQcancelConn(PGconn *conn) +{ + PGconn *cancelConn = makeEmptyPGconn(); + pg_conn_host originalHost; + + if (cancelConn == NULL) + return NULL; + + /* Check we have an open connection */ + if (!conn) + { + libpq_append_conn_error(cancelConn, "passed connection was NULL"); + return (PGcancelConn *) cancelConn; + } + + if (conn->sock == PGINVALID_SOCKET) + { + libpq_append_conn_error(cancelConn, "passed connection is not open"); + return (PGcancelConn *) cancelConn; + } + + + /* + * Indicate that this connection is used to send a cancellation + */ + cancelConn->cancelRequest = true; + + if (!copyPGconn(conn, cancelConn)) + return (PGcancelConn *) cancelConn; + + /* + * Compute derived options + */ + if (!connectOptions2(cancelConn)) + return (PGcancelConn *) cancelConn; + + /* + * Copy cancelation token data from the original connnection + */ + cancelConn->be_pid = conn->be_pid; + cancelConn->be_key = conn->be_key; + + /* + * Cancel requests should not iterate over all possible hosts. The request + * needs to be sent to the exact host and address that the original + * connection used. So we we manually create the host and address arrays + * with a single element after freeing the host array that we generated + * from the connection options. + */ + release_conn_hosts(cancelConn); + cancelConn->nconnhost = 1; + cancelConn->naddr = 1; + + cancelConn->connhost = calloc(cancelConn->nconnhost, sizeof(pg_conn_host)); + if (!cancelConn->connhost) + goto oom_error; + + originalHost = conn->connhost[conn->whichhost]; + if (originalHost.host) + { + cancelConn->connhost[0].host = strdup(originalHost.host); + if (!cancelConn->connhost[0].host) + goto oom_error; + } + if (originalHost.hostaddr) + { + cancelConn->connhost[0].hostaddr = strdup(originalHost.hostaddr); + if (!cancelConn->connhost[0].hostaddr) + goto oom_error; + } + if (originalHost.port) + { + cancelConn->connhost[0].port = strdup(originalHost.port); + if (!cancelConn->connhost[0].port) + goto oom_error; + } + if (originalHost.password) + { + cancelConn->connhost[0].password = strdup(originalHost.password); + if (!cancelConn->connhost[0].password) + goto oom_error; + } + + cancelConn->addr = calloc(cancelConn->naddr, sizeof(AddrInfo)); + if (!cancelConn->connhost) + goto oom_error; + + cancelConn->addr[0].addr = conn->raddr; + cancelConn->addr[0].family = conn->raddr.addr.ss_family; + + cancelConn->status = CONNECTION_STARTING; + return (PGcancelConn *) cancelConn; + +oom_error: + conn->status = CONNECTION_BAD; + libpq_append_conn_error(cancelConn, "out of memory"); + return (PGcancelConn *) cancelConn; +} + /* * PQconnectStartParams * @@ -907,6 +1025,45 @@ fillPGconn(PGconn *conn, PQconninfoOption *connOptions) return true; } +/* + * Copy over option values from srcConn to dstConn + * + * Don't put anything cute here --- intelligence should be in + * connectOptions2 ... + * + * Returns true on success. On failure, returns false and sets error message of + * dstConn. + */ +static bool +copyPGconn(PGconn *srcConn, PGconn *dstConn) +{ + const internalPQconninfoOption *option; + + /* copy over connection options */ + for (option = PQconninfoOptions; option->keyword; option++) + { + if (option->connofs >= 0) + { + const char **tmp = (const char **) ((char *) srcConn + option->connofs); + + if (*tmp) + { + char **dstConnmember = (char **) ((char *) dstConn + option->connofs); + + if (*dstConnmember) + free(*dstConnmember); + *dstConnmember = strdup(*tmp); + if (*dstConnmember == NULL) + { + libpq_append_conn_error(dstConn, "out of memory"); + return false; + } + } + } + } + return true; +} + /* * connectOptions1 * @@ -2031,10 +2188,18 @@ connectDBStart(PGconn *conn) * Set up to try to connect to the first host. (Setting whichhost = -1 is * a bit of a cheat, but PQconnectPoll will advance it to 0 before * anything else looks at it.) + * + * Cancel requests are special though, they should only try one host and + * address. These fields have already set up in PQcancelConn. So leave + * these fields alone for cancel requests. */ - conn->whichhost = -1; - conn->try_next_addr = false; - conn->try_next_host = true; + if (!conn->cancelRequest) + { + conn->whichhost = -1; + conn->try_next_host = true; + conn->try_next_addr = false; + } + conn->status = CONNECTION_NEEDED; /* Also reset the target_server_type state if needed */ @@ -2176,7 +2341,10 @@ connectDBComplete(PGconn *conn) /* * Now try to advance the state machine. */ - flag = PQconnectPoll(conn); + if (conn->cancelRequest) + flag = PQcancelPoll((PGcancelConn *) conn); + else + flag = PQconnectPoll(conn); } } @@ -2301,13 +2469,17 @@ keep_going: /* We will come back to here until there is * Oops, no more hosts. * * If we are trying to connect in "prefer-standby" mode, then drop - * the standby requirement and start over. + * the standby requirement and start over. Don't do this for + * cancel requests though, since we are certain the list of + * servers won't change as the target_server_type option is not + * applicable to those connections. * * Otherwise, an appropriate error message is already set up, so * we just need to set the right status. */ if (conn->target_server_type == SERVER_TYPE_PREFER_STANDBY && - conn->nconnhost > 0) + conn->nconnhost > 0 && + !conn->cancelRequest) { conn->target_server_type = SERVER_TYPE_PREFER_STANDBY_PASS2; conn->whichhost = 0; @@ -2898,6 +3070,29 @@ keep_going: /* We will come back to here until there is } #endif /* USE_SSL */ + /* + * For cancel requests this is as far as we need to go in the + * connection establishment. Now we can actually send our + * cancelation request. + */ + if (conn->cancelRequest) + { + CancelRequestPacket cancelpacket; + + packetlen = sizeof(cancelpacket); + cancelpacket.cancelRequestCode = (MsgType) pg_hton32(CANCEL_REQUEST_CODE); + cancelpacket.backendPID = pg_hton32(conn->be_pid); + cancelpacket.cancelAuthCode = pg_hton32(conn->be_key); + if (pqPacketSend(conn, 0, &cancelpacket, packetlen) != STATUS_OK) + { + libpq_append_conn_error(conn, "could not send cancel packet: %s", + SOCK_STRERROR(SOCK_ERRNO, sebuf, sizeof(sebuf))); + goto error_return; + } + conn->status = CONNECTION_AWAITING_RESPONSE; + return PGRES_POLLING_READING; + } + /* * Build the startup packet. */ @@ -3610,8 +3805,14 @@ keep_going: /* We will come back to here until there is } } - /* We can release the address list now. */ - release_conn_addrinfo(conn); + /* + * For non cancel requests we can release the address list + * now. For cancel requests we never actually resolve + * addresses and instead the addrinfo exists for the lifetime + * of the connection. + */ + if (!conn->cancelRequest) + release_conn_addrinfo(conn); /* * Contents of conn->errorMessage are no longer interesting @@ -3978,19 +4179,8 @@ freePGconn(PGconn *conn) free(conn->events[i].name); } - /* clean up pg_conn_host structures */ - for (int i = 0; i < conn->nconnhost; ++i) - { - free(conn->connhost[i].host); - free(conn->connhost[i].hostaddr); - free(conn->connhost[i].port); - if (conn->connhost[i].password != NULL) - { - explicit_bzero(conn->connhost[i].password, strlen(conn->connhost[i].password)); - free(conn->connhost[i].password); - } - } - free(conn->connhost); + release_conn_addrinfo(conn); + release_conn_hosts(conn); free(conn->client_encoding_initial); free(conn->events); @@ -4101,6 +4291,31 @@ release_conn_addrinfo(PGconn *conn) } } +/* + * release_conn_hosts + * - Free the host list in the PGconn. + */ +static void +release_conn_hosts(PGconn *conn) +{ + if (conn->connhost) + { + for (int i = 0; i < conn->nconnhost; ++i) + { + free(conn->connhost[i].host); + free(conn->connhost[i].hostaddr); + free(conn->connhost[i].port); + if (conn->connhost[i].password != NULL) + { + explicit_bzero(conn->connhost[i].password, strlen(conn->connhost[i].password)); + free(conn->connhost[i].password); + } + } + free(conn->connhost); + } +} + + /* * sendTerminateConn * - Send a terminate message to backend. @@ -4108,6 +4323,15 @@ release_conn_addrinfo(PGconn *conn) static void sendTerminateConn(PGconn *conn) { + /* + * The Postgres cancellation protocol does not have a notion of a + * Terminate message, so don't send one. + */ + if (conn->cancelRequest) + { + return; + } + /* * Note that the protocol doesn't allow us to send Terminate messages * during the startup phase. @@ -4161,7 +4385,13 @@ closePGconn(PGconn *conn) conn->pipelineStatus = PQ_PIPELINE_OFF; pqClearAsyncResult(conn); /* deallocate result */ pqClearConnErrorState(conn); - release_conn_addrinfo(conn); + + /* + * Since cancel requests never change their addrinfo we don't free it + * here. Otherwise we would have to rebuild it during a PQcancelReset. + */ + if (!conn->cancelRequest) + release_conn_addrinfo(conn); /* Reset all state obtained from server, too */ pqDropServerData(conn); @@ -4576,6 +4806,180 @@ cancel_errReturn: return false; } +/* + * PQcancelSend + * + * Send a cancellation request in a blocking fashion. + */ +PGcancelConn * +PQcancelSend(PGconn *conn) +{ + PGcancelConn *cancelConn = PQcancelConn(conn); + + if (!cancelConn || cancelConn->conn.status == CONNECTION_BAD) + return cancelConn; + + if (!connectDBStart(&cancelConn->conn)) + { + cancelConn->conn.status = CONNECTION_BAD; + return cancelConn; + } + + (void) connectDBComplete(&cancelConn->conn); + + return cancelConn; +} + +/* + * PQcancelPoll + * + * Poll a cancel connection. For usage details see PQconnectPoll. + */ +PostgresPollingStatusType +PQcancelPoll(PGcancelConn * cancelConn) +{ + PGconn *conn = (PGconn *) cancelConn; + int n; + + /* + * Before we can call PQconnectPoll we first need to start the connection + * using connectDBstart. Non-cancel connections already do this whenever + * the connection is initialized. But cancel connections wait until the + * caller starts polling, because there might be a large delay between + * creating a cancel connection and actually wanting to use it. + */ + if (conn->status == CONNECTION_STARTING) + { + if (!connectDBStart(&cancelConn->conn)) + { + cancelConn->conn.status = CONNECTION_STARTED; + return PGRES_POLLING_WRITING; + } + } + + /* + * The rest of the connection establishement we leave to PQconnectPoll, + * since it's very similar to normal connection establishment. But once we + * get to the CONNECTION_AWAITING_RESPONSE we need to do our own thing. + */ + if (conn->status != CONNECTION_AWAITING_RESPONSE) + { + return PQconnectPoll(conn); + } + + /* + * At this point we are waiting on the server to close the connection, + * which is its way of communicating that the cancel has been handled. + */ + + n = pqReadData(conn); + + if (n == 0) + return PGRES_POLLING_READING; + +#ifndef WIN32 + + /* + * Windows is a bit special in its EOF behaviour for TCP. Sometimes it + * will error with an ECONNRESET when there is a clean connection closure. + * See these threads for details: + * https://www.postgresql.org/message-id/flat/90b34057-4176-7bb0-0dbb-9822a5f6425b%40greiz-reinsdorf.de + * + * https://www.postgresql.org/message-id/flat/CA%2BhUKG%2BOeoETZQ%3DQw5Ub5h3tmwQhBmDA%3DnuNO3KG%3DzWfUypFAw%40mail.gmail.com + * + * PQcancel ignores such errors and reports success for the cancellation + * anyway, so even if this is not always correct we do the same here. For + * all other OSes we consider any other error than EOF and report it as + * such. + */ + if (n < 0 && n != -2) + { + conn->status = CONNECTION_BAD; + return PGRES_POLLING_FAILED; + } +#endif + + /* + * We don't expect any data, only connection closure. So if we strangly do + * receive some data we consider that an error. + */ + if (n > 0) + { + + libpq_append_conn_error(conn, "received unexpected response from server"); + conn->status = CONNECTION_BAD; + return PGRES_POLLING_FAILED; + } + + /* + * Getting here means that we received an EOF. Which is what we were + * expecting. The cancel request has completed. + */ + cancelConn->conn.status = CONNECTION_OK; + resetPQExpBuffer(&conn->errorMessage); + return PGRES_POLLING_OK; +} + +/* + * PQcancelStatus + * + * Get the status of a cancel connection. + */ +ConnStatusType +PQcancelStatus(const PGcancelConn * cancelConn) +{ + return PQstatus((const PGconn *) cancelConn); +} + +/* + * PQcancelSocket + * + * Get the socket of the cancel connection. + */ +int +PQcancelSocket(const PGcancelConn * cancelConn) +{ + return PQsocket((const PGconn *) cancelConn); +} + +/* + * PQcancelErrorMessage + * + * Get the socket of the cancel connection. + */ +char * +PQcancelErrorMessage(const PGcancelConn * cancelConn) +{ + return PQerrorMessage((const PGconn *) cancelConn); +} + +/* + * PQcancelReset + * + * Resets the cancel connection, so it can be reused to send a new cancel + * request. + */ +void +PQcancelReset(PGcancelConn * cancelConn) +{ + closePGconn((PGconn *) cancelConn); + cancelConn->conn.status = CONNECTION_STARTING; + cancelConn->conn.whichhost = 0; + cancelConn->conn.whichaddr = 0; + cancelConn->conn.try_next_host = false; + cancelConn->conn.try_next_addr = false; +} + +/* + * PQcancelFinish + * + * Closes and frees the cancel connection. + */ +void +PQcancelFinish(PGcancelConn * cancelConn) +{ + PQfinish((PGconn *) cancelConn); +} /* * PQrequestCancel: old, not thread-safe function for requesting query cancel diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index f3d9220496..95899b9f55 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -78,7 +78,9 @@ typedef enum CONNECTION_CONSUME, /* Consuming any extra messages. */ CONNECTION_GSS_STARTUP, /* Negotiating GSSAPI. */ CONNECTION_CHECK_TARGET, /* Checking target server properties. */ - CONNECTION_CHECK_STANDBY /* Checking if server is in standby mode. */ + CONNECTION_CHECK_STANDBY, /* Checking if server is in standby mode. */ + CONNECTION_STARTING /* Waiting for connection attempt to be + * started. */ } ConnStatusType; typedef enum @@ -165,6 +167,11 @@ typedef enum */ typedef struct pg_conn PGconn; +/* PGcancelConn encapsulates a cancel connection to the backend. + * The contents of this struct are not supposed to be known to applications. + */ +typedef struct pg_cancel_conn PGcancelConn; + /* PGresult encapsulates the result of a query (or more precisely, of a single * SQL command --- a query string given to PQsendQuery can contain multiple * commands and thus return multiple PGresult objects). @@ -321,16 +328,28 @@ extern PostgresPollingStatusType PQresetPoll(PGconn *conn); /* Synchronous (blocking) */ extern void PQreset(PGconn *conn); +/* issue a cancel request */ +extern PGcancelConn * PQcancelSend(PGconn *conn); +/* non-blocking version of PQcancelSend */ +extern PGcancelConn * PQcancelConn(PGconn *conn); +extern PostgresPollingStatusType PQcancelPoll(PGcancelConn * cancelConn); +extern ConnStatusType PQcancelStatus(const PGcancelConn * cancelConn); +extern int PQcancelSocket(const PGcancelConn * cancelConn); +extern char *PQcancelErrorMessage(const PGcancelConn * cancelConn); +extern void PQcancelReset(PGcancelConn * cancelConn); +extern void PQcancelFinish(PGcancelConn * cancelConn); + + /* request a cancel structure */ extern PGcancel *PQgetCancel(PGconn *conn); /* free a cancel structure */ extern void PQfreeCancel(PGcancel *cancel); -/* issue a cancel request */ +/* a less secure version of PQcancelSend, but one which is signal-safe */ extern int PQcancel(PGcancel *cancel, char *errbuf, int errbufsize); -/* backwards compatible version of PQcancel; not thread-safe */ +/* deprecated version of PQcancel; not thread-safe */ extern int PQrequestCancel(PGconn *conn); /* Accessor functions for PGconn objects */ diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 940db7ecc8..cd1857ea49 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -397,6 +397,10 @@ struct pg_conn char *ssl_max_protocol_version; /* maximum TLS protocol version */ char *target_session_attrs; /* desired session properties */ + bool cancelRequest; /* true if this connection is used to send a + * cancel request, instead of being a normal + * connection that's used for queries */ + /* Optional file to write trace info to */ FILE *Pfdebug; int traceFlags; @@ -594,6 +598,11 @@ struct pg_conn PQExpBufferData workBuffer; /* expansible string */ }; +struct pg_cancel_conn +{ + PGconn conn; +}; + /* PGcancel stores all data necessary to cancel a connection. A copy of this * data is required to safely cancel a connection running on a different * thread. diff --git a/src/test/modules/libpq_pipeline/libpq_pipeline.c b/src/test/modules/libpq_pipeline/libpq_pipeline.c index 6111bf9b67..6d88d419a6 100644 --- a/src/test/modules/libpq_pipeline/libpq_pipeline.c +++ b/src/test/modules/libpq_pipeline/libpq_pipeline.c @@ -86,6 +86,264 @@ pg_fatal_impl(int line, const char *fmt,...) exit(1); } +/* + * Check that the query on the given connection got cancelled. + * + * This is a function wrapped in a macrco to make the reported line number + * in an error match the line number of the invocation. + */ +#define confirm_query_cancelled(conn) confirm_query_cancelled_impl(__LINE__, conn) +static void +confirm_query_cancelled_impl(int line, PGconn *conn) +{ + PGresult *res = NULL; + + res = PQgetResult(conn); + if (res == NULL) + pg_fatal_impl(line, "PQgetResult returned null: %s", + PQerrorMessage(conn)); + if (PQresultStatus(res) != PGRES_FATAL_ERROR) + pg_fatal_impl(line, "query did not fail when it was expected"); + if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0) + pg_fatal_impl(line, "query failed with a different error than cancellation: %s", + PQerrorMessage(conn)); + PQclear(res); + while (PQisBusy(conn)) + { + PQconsumeInput(conn); + } +} + +#define send_cancellable_query(conn, monitorConn) send_cancellable_query_impl(__LINE__, conn, monitorConn) +static void +send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn) +{ + const char *env_wait; + const Oid paramTypes[1] = {INT4OID}; + + env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT"); + if (env_wait == NULL) + env_wait = "180"; + + if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes, &env_wait, NULL, NULL, 0) != 1) + pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn)); + + /* + * Wait until the query is actually running. Otherwise sending a + * cancellation request might not cancel the query due to race conditions. + */ + while (true) + { + char *value = NULL; + PGresult *res = PQexec( + monitorConn, + "SELECT count(*) FROM pg_stat_activity WHERE " + "query = 'SELECT pg_sleep($1)' " + "AND state = 'active'"); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_fatal("Connection to database failed: %s", PQerrorMessage(monitorConn)); + } + if (PQntuples(res) != 1) + { + pg_fatal("unexpected number of rows received: %d", PQntuples(res)); + } + if (PQnfields(res) != 1) + { + pg_fatal("unexpected number of columns received: %d", PQnfields(res)); + } + value = PQgetvalue(res, 0, 0); + if (*value != '0') + { + PQclear(res); + break; + } + PQclear(res); + + /* + * wait 10ms before polling again + */ + pg_usleep(10000); + } +} + +static void +test_cancel(PGconn *conn, const char *conninfo) +{ + PGcancel *cancel = NULL; + PGcancelConn *cancelConn = NULL; + PGconn *monitorConn = NULL; + char errorbuf[256]; + + fprintf(stderr, "test cancellations... "); + + if (PQsetnonblocking(conn, 1) != 0) + pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn)); + + /* + * Make a connection to the database to monitor the query on the main + * connection. + */ + monitorConn = PQconnectdb(conninfo); + if (PQstatus(conn) != CONNECTION_OK) + { + pg_fatal("Connection to database failed: %s", + PQerrorMessage(conn)); + } + + /* test PQcancel */ + send_cancellable_query(conn, monitorConn); + cancel = PQgetCancel(conn); + if (!PQcancel(cancel, errorbuf, sizeof(errorbuf))) + { + pg_fatal("failed to run PQcancel: %s", errorbuf); + }; + confirm_query_cancelled(conn); + + /* PGcancel object can be reused for the next query */ + send_cancellable_query(conn, monitorConn); + if (!PQcancel(cancel, errorbuf, sizeof(errorbuf))) + { + pg_fatal("failed to run PQcancel: %s", errorbuf); + }; + confirm_query_cancelled(conn); + + PQfreeCancel(cancel); + + /* test PQrequestCancel */ + send_cancellable_query(conn, monitorConn); + if (!PQrequestCancel(conn)) + pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn)); + confirm_query_cancelled(conn); + + /* test PQcancelSend */ + send_cancellable_query(conn, monitorConn); + cancelConn = PQcancelSend(conn); + if (PQcancelStatus(cancelConn) == CONNECTION_BAD) + pg_fatal("failed to run PQcancelSend: %s", PQcancelErrorMessage(cancelConn)); + confirm_query_cancelled(conn); + PQcancelFinish(cancelConn); + + /* test PQcancelConn and then polling with PQcancelPoll */ + send_cancellable_query(conn, monitorConn); + cancelConn = PQcancelConn(conn); + if (PQcancelStatus(cancelConn) == CONNECTION_BAD) + pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn)); + while (true) + { + struct timeval tv; + fd_set input_mask; + fd_set output_mask; + PostgresPollingStatusType pollres = PQcancelPoll(cancelConn); + int sock = PQcancelSocket(cancelConn); + + if (pollres == PGRES_POLLING_OK) + { + break; + } + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + switch (pollres) + { + case PGRES_POLLING_READING: + pg_debug("polling for reads\n"); + FD_SET(sock, &input_mask); + break; + case PGRES_POLLING_WRITING: + pg_debug("polling for writes\n"); + FD_SET(sock, &output_mask); + break; + default: + pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn)); + } + + if (sock < 0) + pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn)); + + tv.tv_sec = 3; + tv.tv_usec = 0; + + while (true) + { + if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0) + { + if (errno == EINTR) + continue; + pg_fatal("select() failed: %m"); + } + break; + } + } + if (PQcancelStatus(cancelConn) != CONNECTION_OK) + pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn)); + confirm_query_cancelled(conn); + + /* + * test PQcancelReset works on the cancel connection and it can be reused + * after + */ + PQcancelReset(cancelConn); + + send_cancellable_query(conn, monitorConn); + if (PQcancelStatus(cancelConn) == CONNECTION_BAD) + pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn)); + while (true) + { + struct timeval tv; + fd_set input_mask; + fd_set output_mask; + PostgresPollingStatusType pollres = PQcancelPoll(cancelConn); + int sock = PQcancelSocket(cancelConn); + + if (pollres == PGRES_POLLING_OK) + { + break; + } + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + switch (pollres) + { + case PGRES_POLLING_READING: + pg_debug("polling for reads\n"); + FD_SET(sock, &input_mask); + break; + case PGRES_POLLING_WRITING: + pg_debug("polling for writes\n"); + FD_SET(sock, &output_mask); + break; + default: + pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn)); + } + + if (sock < 0) + pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn)); + + tv.tv_sec = 3; + tv.tv_usec = 0; + + while (true) + { + if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0) + { + if (errno == EINTR) + continue; + pg_fatal("select() failed: %m"); + } + break; + } + } + if (PQcancelStatus(cancelConn) != CONNECTION_OK) + pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn)); + confirm_query_cancelled(conn); + + PQcancelFinish(cancelConn); + + fprintf(stderr, "ok\n"); +} + static void test_disallowed_in_pipeline(PGconn *conn) { @@ -985,7 +1243,7 @@ test_prepared(PGconn *conn) static void notice_processor(void *arg, const char *message) { - int *n_notices = (int *) arg; + int *n_notices = (int *) arg; (*n_notices)++; fprintf(stderr, "NOTICE %d: %s", *n_notices, message); @@ -1681,6 +1939,7 @@ usage(const char *progname) static void print_test_list(void) { + printf("cancel\n"); printf("disallowed_in_pipeline\n"); printf("multi_pipelines\n"); printf("nosync\n"); @@ -1782,7 +2041,9 @@ main(int argc, char **argv) PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE); } - if (strcmp(testname, "disallowed_in_pipeline") == 0) + if (strcmp(testname, "cancel") == 0) + test_cancel(conn, conninfo); + else if (strcmp(testname, "disallowed_in_pipeline") == 0) test_disallowed_in_pipeline(conn); else if (strcmp(testname, "multi_pipelines") == 0) test_multi_pipelines(conn); -- 2.34.1