From ff3c079c349028de52be3a12e12e78e946c28bdd Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 25 Jan 2023 13:32:15 +0100 Subject: [PATCH v12 5/5] Start using new libpq cancel APIs A previous commit introduced new APIs to libpq for cancelling queries. This replaces the usage of the old APIs in the codebase with these newer ones. --- contrib/dblink/dblink.c | 30 ++++-- contrib/postgres_fdw/connection.c | 99 ++++++++++++++++--- .../postgres_fdw/expected/postgres_fdw.out | 15 +++ contrib/postgres_fdw/sql/postgres_fdw.sql | 7 ++ src/fe_utils/connect_utils.c | 10 +- src/test/isolation/isolationtester.c | 29 +++--- 6 files changed, 139 insertions(+), 51 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 78a8bcee6e3..e139f66e116 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -1326,22 +1326,32 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query); Datum dblink_cancel_query(PG_FUNCTION_ARGS) { - int res; PGconn *conn; - PGcancel *cancel; - char errbuf[256]; + PGcancelConn *cancelConn; + char *msg; dblink_init(); conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); - cancel = PQgetCancel(conn); + cancelConn = PQcancelSend(conn); - res = PQcancel(cancel, errbuf, 256); - PQfreeCancel(cancel); + PG_TRY(); + { + if (PQcancelStatus(cancelConn) == CONNECTION_BAD) + { + msg = pchomp(PQcancelErrorMessage(cancelConn)); + } + else + { + msg = "OK"; + } + } + PG_FINALLY(); + { + PQcancelFinish(cancelConn); + } + PG_END_TRY(); - if (res == 1) - PG_RETURN_TEXT_P(cstring_to_text("OK")); - else - PG_RETURN_TEXT_P(cstring_to_text(errbuf)); + PG_RETURN_TEXT_P(cstring_to_text(msg)); } diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 12b54f15cd6..bc3e5181683 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -1234,35 +1234,104 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel) static bool pgfdw_cancel_query(PGconn *conn) { - PGcancel *cancel; - char errbuf[256]; PGresult *result = NULL; - TimestampTz endtime; - bool timed_out; /* * If it takes too long to cancel the query and discard the result, assume * the connection is dead. */ - endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + TimestampTz endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000); + bool timed_out = false; + bool failed = false; + PGcancelConn *cancel_conn = PQcancelConn(conn); - /* - * Issue cancel request. Unfortunately, there's no good way to limit the - * amount of time that we might block inside PQgetCancel(). - */ - if ((cancel = PQgetCancel(conn))) + + if (PQcancelStatus(cancel_conn) == CONNECTION_BAD) { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + PG_TRY(); { ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - return false; + pchomp(PQcancelErrorMessage(cancel_conn))))); + } + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + return false; + } + + /* In what follows, do not leak any PGcancelConn on an error. */ + PG_TRY(); + { + while (true) + { + TimestampTz now = GetCurrentTimestamp(); + long cur_timeout; + PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn); + int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + if (pollres == PGRES_POLLING_OK) + { + break; + } + + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) + { + timed_out = true; + failed = true; + goto exit; + } + + switch (pollres) + { + case PGRES_POLLING_READING: + waitEvents |= WL_SOCKET_READABLE; + break; + case PGRES_POLLING_WRITING: + waitEvents |= WL_SOCKET_WRITEABLE; + break; + default: + failed = true; + goto exit; + } + + /* Sleep until there's something to do */ + WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } +exit: ; + if (failed) + { + if (timed_out) + { + ereport(WARNING, + (errmsg("could not cancel request due to timeout"))); + } + else + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + pchomp(PQcancelErrorMessage(cancel_conn))))); + } } - PQfreeCancel(cancel); } + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + + if (failed) + return false; /* Get and discard the result of the query. */ if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out)) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 04a3ef450cf..064c3103a5e 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -2688,6 +2688,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c (10 rows) ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(*)) + Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)) + Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE)) +(4 rows) + +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +ERROR: canceling statement due to statement timeout +RESET statement_timeout; -- cleanup DROP OWNED BY regress_view_owner; DROP ROLE regress_view_owner; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 4f3088c03ea..640958df136 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -713,6 +713,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +RESET statement_timeout; + -- cleanup DROP OWNED BY regress_view_owner; DROP ROLE regress_view_owner; diff --git a/src/fe_utils/connect_utils.c b/src/fe_utils/connect_utils.c index 7a1edea7c8c..b32448c0103 100644 --- a/src/fe_utils/connect_utils.c +++ b/src/fe_utils/connect_utils.c @@ -157,19 +157,11 @@ connectMaintenanceDatabase(ConnParams *cparams, void disconnectDatabase(PGconn *conn) { - char errbuf[256]; - Assert(conn != NULL); if (PQtransactionStatus(conn) == PQTRANS_ACTIVE) { - PGcancel *cancel; - - if ((cancel = PQgetCancel(conn))) - { - (void) PQcancel(cancel, errbuf, sizeof(errbuf)); - PQfreeCancel(cancel); - } + PQcancelFinish(PQcancelSend(conn)); } PQfinish(conn); diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c index 0a66235153a..3781f7982b2 100644 --- a/src/test/isolation/isolationtester.c +++ b/src/test/isolation/isolationtester.c @@ -946,26 +946,21 @@ try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags) */ if (td > max_step_wait && !canceled) { - PGcancel *cancel = PQgetCancel(conn); + PGcancelConn *cancel_conn = PQcancelSend(conn); - if (cancel != NULL) + if (PQcancelStatus(cancel_conn) == CONNECTION_OK) { - char buf[256]; - - if (PQcancel(cancel, buf, sizeof(buf))) - { - /* - * print to stdout not stderr, as this should appear - * in the test case's results - */ - printf("isolationtester: canceling step %s after %d seconds\n", - step->name, (int) (td / USECS_PER_SEC)); - canceled = true; - } - else - fprintf(stderr, "PQcancel failed: %s\n", buf); - PQfreeCancel(cancel); + /* + * print to stdout not stderr, as this should appear in + * the test case's results + */ + printf("isolationtester: canceling step %s after %d seconds\n", + step->name, (int) (td / USECS_PER_SEC)); + canceled = true; } + else + fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn)); + PQcancelFinish(cancel_conn); } /* -- 2.34.1