diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 189f290..64a00b4 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -17,6 +17,7 @@
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
+#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
@@ -448,6 +449,67 @@ GetPrepStmtNumber(PGconn *conn)
}
/*
+ * Execute query in non-blocking mode and fetch back result
+ */
+PGresult *
+pgfdw_exec_query(PGconn *conn, const char *query)
+{
+ if (!PQsendQuery(conn, query))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
+
+ return pgfdw_get_result(conn, query);
+}
+
+/*
+ * Scan for results of query run in non-waiting mode
+ *
+ * This should be run after executing a query on the remote server and
+ * offers quick responsiveness by checking for any interruptions. The
+ * last result is returned back to caller, and previous results are
+ * consumed. Caller is responsible for the error handling on the fetched
+ * result.
+ */
+PGresult *
+pgfdw_get_result(PGconn *conn, const char *query)
+{
+ PGresult *last_res;
+
+ for (;;)
+ {
+ PGresult *res;
+
+ while (PQisBusy(conn))
+ {
+ int wc;
+
+ /* Sleep until there's something to do */
+ wc = WaitLatchOrSocket(MyLatch,
+ WL_LATCH_SET | WL_SOCKET_READABLE,
+ PQsocket(conn),
+ -1L);
+ ResetLatch(MyLatch);
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Data available in socket */
+ if (wc & WL_SOCKET_READABLE)
+ {
+ if (!PQconsumeInput(conn))
+ pgfdw_report_error(ERROR, NULL, conn, false, query);
+ }
+ }
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ break;
+
+ last_res = res;
+ }
+
+ return last_res;
+}
+
+/*
* Report an error we got from the remote server.
*
* elevel: error level to use (typically ERROR, but might be less)
@@ -598,6 +660,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
+
+ /*
+ * If a command has been submitted to the remote server
+ * using an asynchronous execution function, the command
+ * might not have yet completed. Check to see if a command
+ * is still being processed by the remote server, and if so,
+ * request cancellation of the command; if not, abort
+ * gracefully.
+ */
+ if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
+ {
+ PGcancel *cancel;
+ char errbuf[256];
+
+ if ((cancel = PQgetCancel(entry->conn)))
+ {
+ if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
+ ereport(WARNING,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not send cancel request: %s",
+ errbuf)));
+ PQfreeCancel(cancel);
+ }
+ break;
+ }
+
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index ee0220a..31f5bb2 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
@@ -1950,7 +1950,7 @@ postgresEndForeignModify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
@@ -2712,7 +2712,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
@@ -2821,8 +2821,11 @@ create_cursor(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexecParams(conn, buf.data, numParams, NULL, values,
- NULL, NULL, 0);
+ if (!PQsendQueryParams(conn, buf.data, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+ res = pgfdw_get_result(conn, buf.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
@@ -2868,7 +2871,7 @@ fetch_more_data(ForeignScanState *node)
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
@@ -2978,7 +2981,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(conn, sql);
+ res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
@@ -3151,8 +3154,12 @@ execute_dml_stmt(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
- numParams, NULL, values, NULL, NULL, 0);
+ if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+
+ dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
@@ -3355,7 +3362,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3449,7 +3456,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = PQexec(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
@@ -3500,7 +3507,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
fetch_size, cursor_number);
- res = PQexec(conn, fetch_sql);
+ res = pgfdw_exec_query(conn, fetch_sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -3675,7 +3682,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
- res = PQexec(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
@@ -3774,7 +3781,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
- res = PQexec(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 3a11d99..574b07d 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
+extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
bool clear, const char *sql);