From 2f23b515aa31a0b524db48b68d254bf96dbfe265 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Thu, 14 Dec 2023 13:39:09 +0100 Subject: [PATCH v34 2/2] Start using new libpq cancel APIs A previous commit introduced new APIs to libpq for cancelling queries. This replaces the usage of the old APIs in most of the codebase with these newer ones. This specifically leaves out changes to psql and pgbench as those would need a much larger refactor to be able to call them, due to the new functions not being signal-safe. --- contrib/dblink/dblink.c | 30 +++-- contrib/postgres_fdw/connection.c | 105 +++++++++++++++--- .../postgres_fdw/expected/postgres_fdw.out | 15 +++ contrib/postgres_fdw/sql/postgres_fdw.sql | 7 ++ src/fe_utils/connect_utils.c | 11 +- src/test/isolation/isolationtester.c | 29 ++--- 6 files changed, 145 insertions(+), 52 deletions(-) diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 19a362526d..98dcca3e6f 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -1346,22 +1346,32 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query); Datum dblink_cancel_query(PG_FUNCTION_ARGS) { - int res; PGconn *conn; - PGcancel *cancel; - char errbuf[256]; + PGcancelConn *cancelConn; + char *msg; dblink_init(); conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); - cancel = PQgetCancel(conn); + cancelConn = PQcancelCreate(conn); - res = PQcancel(cancel, errbuf, 256); - PQfreeCancel(cancel); + PG_TRY(); + { + if (!PQcancelBlocking(cancelConn)) + { + msg = pchomp(PQcancelErrorMessage(cancelConn)); + } + else + { + msg = "OK"; + } + } + PG_FINALLY(); + { + PQcancelFinish(cancelConn); + } + PG_END_TRY(); - if (res == 1) - PG_RETURN_TEXT_P(cstring_to_text("OK")); - else - PG_RETURN_TEXT_P(cstring_to_text(errbuf)); + PG_RETURN_TEXT_P(cstring_to_text(msg)); } diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 4931ebf591..dcc13dc3b2 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -133,7 +133,7 @@ static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel); static bool pgfdw_cancel_query(PGconn *conn); -static bool pgfdw_cancel_query_begin(PGconn *conn); +static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime); static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input); static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, @@ -1315,36 +1315,104 @@ pgfdw_cancel_query(PGconn *conn) endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), CONNECTION_CLEANUP_TIMEOUT); - if (!pgfdw_cancel_query_begin(conn)) + if (!pgfdw_cancel_query_begin(conn, endtime)) return false; return pgfdw_cancel_query_end(conn, endtime, false); } static bool -pgfdw_cancel_query_begin(PGconn *conn) +pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime) { - PGcancel *cancel; - char errbuf[256]; + bool timed_out = false; + bool failed = false; + PGcancelConn *cancel_conn = PQcancelCreate(conn); - /* - * Issue cancel request. Unfortunately, there's no good way to limit the - * amount of time that we might block inside PQgetCancel(). - */ - if ((cancel = PQgetCancel(conn))) + + if (!PQcancelStart(cancel_conn)) { - if (!PQcancel(cancel, errbuf, sizeof(errbuf))) + PG_TRY(); { ereport(WARNING, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("could not send cancel request: %s", - errbuf))); - PQfreeCancel(cancel); - return false; + pchomp(PQcancelErrorMessage(cancel_conn))))); } - PQfreeCancel(cancel); + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + return false; } - return true; + /* In what follows, do not leak any PGcancelConn on an error. */ + PG_TRY(); + { + while (true) + { + TimestampTz now = GetCurrentTimestamp(); + long cur_timeout; + PostgresPollingStatusType pollres = PQcancelPoll(cancel_conn); + int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH; + + if (pollres == PGRES_POLLING_OK) + { + break; + } + + /* If timeout has expired, give up, else get sleep time. */ + cur_timeout = TimestampDifferenceMilliseconds(now, endtime); + if (cur_timeout <= 0) + { + timed_out = true; + failed = true; + goto exit; + } + + switch (pollres) + { + case PGRES_POLLING_READING: + waitEvents |= WL_SOCKET_READABLE; + break; + case PGRES_POLLING_WRITING: + waitEvents |= WL_SOCKET_WRITEABLE; + break; + default: + failed = true; + goto exit; + } + + /* Sleep until there's something to do */ + WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn), + cur_timeout, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + } +exit: ; + if (failed) + { + if (timed_out) + { + ereport(WARNING, + (errmsg("could not cancel request due to timeout"))); + } + else + { + ereport(WARNING, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not send cancel request: %s", + pchomp(PQcancelErrorMessage(cancel_conn))))); + } + } + } + PG_FINALLY(); + { + PQcancelFinish(cancel_conn); + } + PG_END_TRY(); + + return !failed; } static bool @@ -1685,7 +1753,10 @@ pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel, */ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE) { - if (!pgfdw_cancel_query_begin(entry->conn)) + TimestampTz endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + CONNECTION_CLEANUP_TIMEOUT); + + if (!pgfdw_cancel_query_begin(entry->conn, endtime)) return false; /* Unable to cancel running query */ *cancel_requested = lappend(*cancel_requested, entry); } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 58a603ac56..e03160bd97 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -2739,6 +2739,21 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c (10 rows) ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Foreign Scan + Output: (count(*)) + Relations: Aggregate on ((((public.ft1) INNER JOIN (public.ft2)) INNER JOIN (public.ft4)) INNER JOIN (public.ft5)) + Remote SQL: SELECT count(*) FROM ((("S 1"."T 1" r1 INNER JOIN "S 1"."T 1" r2 ON (TRUE)) INNER JOIN "S 1"."T 3" r4 ON (TRUE)) INNER JOIN "S 1"."T 4" r6 ON (TRUE)) +(4 rows) + +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +ERROR: canceling statement due to statement timeout +RESET statement_timeout; -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index e3d147de6d..2626e68cc6 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -737,6 +737,13 @@ SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c SELECT t1.c1, t2.c2 FROM v4 t1 LEFT JOIN ft5 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 10 LIMIT 10; ALTER VIEW v4 OWNER TO regress_view_owner; +-- Make sure this big CROSS JOIN query is pushed down +EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; +-- Make sure query cancellation works +SET statement_timeout = '10ms'; +select count(*) from ft1 CROSS JOIN ft2 CROSS JOIN ft4 CROSS JOIN ft5; -- this takes very long +RESET statement_timeout; + -- ==================================================================== -- Check that userid to use when querying the remote table is correctly -- propagated into foreign rels present in subqueries under an UNION ALL diff --git a/src/fe_utils/connect_utils.c b/src/fe_utils/connect_utils.c index 808d54461f..5ed9f3ba17 100644 --- a/src/fe_utils/connect_utils.c +++ b/src/fe_utils/connect_utils.c @@ -157,19 +157,14 @@ connectMaintenanceDatabase(ConnParams *cparams, void disconnectDatabase(PGconn *conn) { - char errbuf[256]; - Assert(conn != NULL); if (PQtransactionStatus(conn) == PQTRANS_ACTIVE) { - PGcancel *cancel; + PGcancelConn *cancelConn = PQcancelCreate(conn); - if ((cancel = PQgetCancel(conn))) - { - (void) PQcancel(cancel, errbuf, sizeof(errbuf)); - PQfreeCancel(cancel); - } + (void) PQcancelBlocking(cancelConn); + PQcancelFinish(cancelConn); } PQfinish(conn); diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c index ed110f740f..0b342b5c2b 100644 --- a/src/test/isolation/isolationtester.c +++ b/src/test/isolation/isolationtester.c @@ -946,26 +946,21 @@ try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags) */ if (td > max_step_wait && !canceled) { - PGcancel *cancel = PQgetCancel(conn); + PGcancelConn *cancel_conn = PQcancelCreate(conn); - if (cancel != NULL) + if (PQcancelBlocking(cancel_conn)) { - char buf[256]; - - if (PQcancel(cancel, buf, sizeof(buf))) - { - /* - * print to stdout not stderr, as this should appear - * in the test case's results - */ - printf("isolationtester: canceling step %s after %d seconds\n", - step->name, (int) (td / USECS_PER_SEC)); - canceled = true; - } - else - fprintf(stderr, "PQcancel failed: %s\n", buf); - PQfreeCancel(cancel); + /* + * print to stdout not stderr, as this should appear in + * the test case's results + */ + printf("isolationtester: canceling step %s after %d seconds\n", + step->name, (int) (td / USECS_PER_SEC)); + canceled = true; } + else + fprintf(stderr, "PQcancel failed: %s\n", PQcancelErrorMessage(cancel_conn)); + PQcancelFinish(cancel_conn); } /* -- 2.39.2