diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 1a1e5b5..f52d2ba 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -15,11 +15,15 @@ #include "postgres_fdw.h" #include "access/xact.h" +#include "access/xtm.h" +#include "access/transam.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "utils/hsearch.h" #include "utils/memutils.h" +#undef DEBUG3 +#define DEBUG3 WARNING /* * Connection cache hash table entry @@ -68,6 +72,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void check_conn_params(const char **keywords, const char **values); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); +static void do_sql_send_command(PGconn *conn, const char *sql); +static void do_sql_wait_command(PGconn *conn, const char *sql); static void begin_remote_xact(ConnCacheEntry *entry); static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_subxact_callback(SubXactEvent event, @@ -358,6 +364,27 @@ do_sql_command(PGconn *conn, const char *sql) PQclear(res); } +static void +do_sql_send_command(PGconn *conn, const char *sql) +{ + if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) { + PGresult *res = PQgetResult(conn); + pgfdw_report_error(ERROR, res, conn, true, sql); + PQclear(res); + } +} + +static void +do_sql_wait_command(PGconn *conn, const char *sql) +{ + PGresult *res; + while ((res = PQgetResult(conn)) != NULL) { + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, sql); + PQclear(res); + } +} + /* * Start remote transaction or subtransaction, if needed. * @@ -376,11 +403,21 @@ begin_remote_xact(ConnCacheEntry *entry) /* Start main transaction if we haven't yet */ if (entry->xact_depth <= 0) { + TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId(); const char *sql; elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); + if (TransactionIdIsValid(gxid)) { + char stmt[64]; + PGresult *res; + + snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid); + res = PQexec(entry->conn, stmt); + PQclear(res); + } + if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; else @@ -541,16 +578,36 @@ pgfdw_xact_callback(XactEvent event, void *arg) /* If it has an open remote transaction, try to close it */ if (entry->xact_depth > 0) { - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); + elog(DEBUG3, "closing remote transaction on connection %p event %d", + entry->conn, event); switch (event) { case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: /* Commit all remote transactions during pre-commit */ - do_sql_command(entry->conn, "COMMIT TRANSACTION"); + do_sql_send_command(entry->conn, "COMMIT TRANSACTION"); + continue; + case XACT_EVENT_PRE_PREPARE: + /* + * We disallow remote transactions that modified anything, + * since it's not very reasonable to hold them open until + * the prepared transaction is committed. For the moment, + * throw error unconditionally; later we might allow + * read-only cases. Note that the error will cause us to + * come right back here with event == XACT_EVENT_ABORT, so + * we'll clean up the connection state at that point. + */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot prepare a transaction that modified remote tables"))); + break; + + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_COMMIT: + case XACT_EVENT_PREPARE: + do_sql_wait_command(entry->conn, "COMMIT TRANSACTION"); /* * If there were any errors in subtransactions, and we * made prepared statements, do a DEALLOCATE ALL to make @@ -574,27 +631,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) entry->have_prep_stmt = false; entry->have_error = false; break; - case XACT_EVENT_PRE_PREPARE: - /* - * We disallow remote transactions that modified anything, - * since it's not very reasonable to hold them open until - * the prepared transaction is committed. For the moment, - * throw error unconditionally; later we might allow - * read-only cases. Note that the error will cause us to - * come right back here with event == XACT_EVENT_ABORT, so - * we'll clean up the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified remote tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ @@ -618,6 +655,12 @@ pgfdw_xact_callback(XactEvent event, void *arg) entry->have_error = false; } break; + + case XACT_EVENT_START: + case XACT_EVENT_ABORT_PREPARED: + case XACT_EVENT_COMMIT_PREPARED: + break; + } } @@ -631,21 +674,23 @@ pgfdw_xact_callback(XactEvent event, void *arg) if (PQstatus(entry->conn) != CONNECTION_OK || PQtransactionStatus(entry->conn) != PQTRANS_IDLE) { - elog(DEBUG3, "discarding connection %p", entry->conn); + elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn)); PQfinish(entry->conn); entry->conn = NULL; } } - /* - * Regardless of the event type, we can now mark ourselves as out of the - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) - */ - xact_got_connection = false; + if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) { + /* + * Regardless of the event type, we can now mark ourselves as out of the + * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, + * this saves a useless scan of the hashtable during COMMIT or PREPARE.) + */ + xact_got_connection = false; - /* Also reset cursor numbering for next transaction */ - cursor_number = 0; + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; + } } /* diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql index a0f0fc1..0ce8f0e 100644 --- a/contrib/postgres_fdw/postgres_fdw--1.0.sql +++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql @@ -16,3 +16,8 @@ LANGUAGE C STRICT; CREATE FOREIGN DATA WRAPPER postgres_fdw HANDLER postgres_fdw_handler VALIDATOR postgres_fdw_validator; + +CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring) +RETURNS void +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 1902f1f..1925c61 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -230,6 +230,7 @@ typedef struct * SQL functions */ PG_FUNCTION_INFO_V1(postgres_fdw_handler); +PG_FUNCTION_INFO_V1(postgres_fdw_exec); /* * FDW callback routines @@ -3002,3 +3003,19 @@ conversion_error_callback(void *arg) NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname), RelationGetRelationName(errpos->rel)); } + +Datum +postgres_fdw_exec(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + char const* sql = PG_GETARG_CSTRING(1); + Oid userid = GetUserId(); + ForeignTable *table = GetForeignTable(relid); + ForeignServer *server = GetForeignServer(table->serverid); + UserMapping *user = GetUserMapping(userid, server->serverid); + PGconn* conn = GetConnection(server, user, false); + PGresult* res = PQexec(conn, sql); + PQclear(res); + ReleaseConnection(conn); + PG_RETURN_VOID(); +}