*** a/contrib/postgres_fdw/deparse.c --- b/contrib/postgres_fdw/deparse.c *************** *** 816,822 **** deparseTargetList(StringInfo buf, * * If params is not NULL, it receives a list of Params and other-relation Vars * used in the clauses; these values must be transmitted to the remote server ! * as parameter values. * * If params is NULL, we're generating the query for EXPLAIN purposes, * so Params and other-relation Vars should be replaced by dummy values. --- 816,822 ---- * * If params is not NULL, it receives a list of Params and other-relation Vars * used in the clauses; these values must be transmitted to the remote server ! * as parameter values. Caller is responsible for initializing it to empty. * * If params is NULL, we're generating the query for EXPLAIN purposes, * so Params and other-relation Vars should be replaced by dummy values. *************** *** 833,841 **** appendWhereClause(StringInfo buf, int nestlevel; ListCell *lc; - if (params) - *params = NIL; /* initialize result list to empty */ - /* Set up context struct for recursion */ context.root = root; context.foreignrel = baserel; --- 833,838 ---- *************** *** 971,976 **** deparseUpdateSql(StringInfo buf, PlannerInfo *root, --- 968,1030 ---- } /* + * deparse remote UPDATE statement + * + * The statement text is appended to buf, and we also create an integer List + * of the columns being retrieved by RETURNING (if any), which is returned + * to *retrieved_attrs. + */ + void + deparsePushedDownUpdateSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *targetlist, + List *targetAttrs, + List *remote_conds, + List **params_list, + List *returningList, + List **retrieved_attrs) + { + RelOptInfo *baserel = root->simple_rel_array[rtindex]; + deparse_expr_cxt context; + bool first; + ListCell *lc; + + if (params_list) + *params_list = NIL; /* initialize result list to empty */ + + /* Set up context struct for recursion */ + context.root = root; + context.foreignrel = baserel; + context.buf = buf; + context.params_list = params_list; + + appendStringInfoString(buf, "UPDATE "); + deparseRelation(buf, rel); + appendStringInfoString(buf, " SET "); + + first = true; + foreach(lc, targetAttrs) + { + int attnum = lfirst_int(lc); + TargetEntry *tle = get_tle_by_resno(targetlist, attnum); + + if (!first) + appendStringInfoString(buf, ", "); + first = false; + + deparseColumnRef(buf, rtindex, attnum, root); + appendStringInfoString(buf, " = "); + deparseExpr((Expr *) tle->expr, &context); + } + if (remote_conds) + appendWhereClause(buf, root, baserel, remote_conds, + true, params_list); + + deparseReturningList(buf, root, rtindex, rel, false, + returningList, retrieved_attrs); + } + + /* * deparse remote DELETE statement * * The statement text is appended to buf, and we also create an integer List *************** *** 993,998 **** deparseDeleteSql(StringInfo buf, PlannerInfo *root, --- 1047,1082 ---- } /* + * deparse remote DELETE statement + * + * The statement text is appended to buf, and we also create an integer List + * of the columns being retrieved by RETURNING (if any), which is returned + * to *retrieved_attrs. + */ + void + deparsePushedDownDeleteSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *remote_conds, + List **params_list, + List *returningList, + List **retrieved_attrs) + { + RelOptInfo *baserel = root->simple_rel_array[rtindex]; + + if (params_list) + *params_list = NIL; /* initialize result list to empty */ + + appendStringInfoString(buf, "DELETE FROM "); + deparseRelation(buf, rel); + if (remote_conds) + appendWhereClause(buf, root, baserel, remote_conds, + true, params_list); + + deparseReturningList(buf, root, rtindex, rel, false, + returningList, retrieved_attrs); + } + + /* * Add a RETURNING clause, if needed, to an INSERT/UPDATE/DELETE. */ static void *** a/contrib/postgres_fdw/expected/postgres_fdw.out --- b/contrib/postgres_fdw/expected/postgres_fdw.out *************** *** 1314,1320 **** INSERT INTO ft2 (c1,c2,c3) --- 1314,1339 ---- (3 rows) INSERT INTO ft2 (c1,c2,c3) VALUES (1104,204,'ddd'), (1105,205,'eee'); + EXPLAIN (verbose, costs off) + UPDATE ft2 SET c2 = c2 + 300, c3 = c3 || '_update3' WHERE c1 % 10 = 3; -- can be pushed down + QUERY PLAN + ---------------------------------------------------------------------------------------------------------------------- + Update on public.ft2 + -> Foreign Update on public.ft2 + Remote SQL: UPDATE "S 1"."T 1" SET c2 = (c2 + 300), c3 = (c3 || '_update3'::text) WHERE ((("C 1" % 10) = 3)) + (3 rows) + UPDATE ft2 SET c2 = c2 + 300, c3 = c3 || '_update3' WHERE c1 % 10 = 3; + EXPLAIN (verbose, costs off) + UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *; -- can be pushed down + QUERY PLAN + ------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Update on public.ft2 + Output: c1, c2, c3, c4, c5, c6, c7, c8 + -> Foreign Update on public.ft2 + Remote SQL: UPDATE "S 1"."T 1" SET c2 = (c2 + 400), c3 = (c3 || '_update7'::text) WHERE ((("C 1" % 10) = 7)) RETURNING "C 1", c2, c3, c4, c5, c6, c7, c8 + (4 rows) + UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *; c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 ------+-----+--------------------+------------------------------+--------------------------+----+------------+----- *************** *** 1424,1430 **** UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING EXPLAIN (verbose, costs off) UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT ! FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------- Update on public.ft2 --- 1443,1449 ---- EXPLAIN (verbose, costs off) UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT ! FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; -- can't be pushed down QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------------------------------- Update on public.ft2 *************** *** 1445,1460 **** UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; EXPLAIN (verbose, costs off) ! DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; ! QUERY PLAN ! ---------------------------------------------------------------------------------------- Delete on public.ft2 Output: c1, c4 ! Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", c4 ! -> Foreign Scan on public.ft2 ! Output: ctid ! Remote SQL: SELECT ctid FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) FOR UPDATE ! (6 rows) DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; c1 | c4 --- 1464,1477 ---- UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; EXPLAIN (verbose, costs off) ! DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; -- can be pushed down ! QUERY PLAN ! -------------------------------------------------------------------------------------------- Delete on public.ft2 Output: c1, c4 ! -> Foreign Delete on public.ft2 ! Remote SQL: DELETE FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) RETURNING "C 1", c4 ! (4 rows) DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; c1 | c4 *************** *** 1565,1571 **** DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; (103 rows) EXPLAIN (verbose, costs off) ! DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- Delete on public.ft2 --- 1582,1588 ---- (103 rows) EXPLAIN (verbose, costs off) ! DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; -- can't be pushed down QUERY PLAN ---------------------------------------------------------------------------------------------------------------------- Delete on public.ft2 *************** *** 2408,2413 **** SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1; --- 2425,2443 ---- 1104 | 204 | ddd | (819 rows) + EXPLAIN (verbose, costs off) + UPDATE ft2 SET c7 = DEFAULT WHERE c1 % 10 = 0 AND date(c4) = '1970-01-01'::date; -- can't be pushed down + QUERY PLAN + ----------------------------------------------------------------------------------------------------------------------- + Update on public.ft2 + Remote SQL: UPDATE "S 1"."T 1" SET c7 = $2 WHERE ctid = $1 + -> Foreign Scan on public.ft2 + Output: c1, c2, NULL::integer, c3, c4, c5, c6, 'ft2 '::character(10), c8, ctid + Filter: (date(ft2.c4) = '01-01-1970'::date) + Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c8, ctid FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 0)) FOR UPDATE + (6 rows) + + UPDATE ft2 SET c7 = DEFAULT WHERE c1 % 10 = 0 AND date(c4) = '1970-01-01'::date; -- Test that trigger on remote table works as expected CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$ BEGIN *************** *** 2554,2560 **** CONTEXT: Remote SQL command: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02 08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1 -- Test savepoint/rollback behavior select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count --- 2584,2590 ---- UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02 08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (- c2) WHERE (("C 1" = 1)) -- Test savepoint/rollback behavior select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count *************** *** 2713,2719 **** savepoint s3; update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (10, -2, 00010_trig_update_trig_update, 1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1 rollback to savepoint s3; select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count --- 2743,2749 ---- update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (10, -2, 00010_trig_update_trig_update, 1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (-2) WHERE ((c2 = 42)) AND (("C 1" = 10)) rollback to savepoint s3; select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1; c2 | count *************** *** 2853,2859 **** CONTEXT: Remote SQL command: INSERT INTO "S 1"."T 1"("C 1", c2, c3, c4, c5, c6, UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02 08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1 ALTER FOREIGN TABLE ft1 DROP CONSTRAINT ft1_c2positive; -- But inconsistent check constraints provide inconsistent results ALTER FOREIGN TABLE ft1 ADD CONSTRAINT ft1_c2negative CHECK (c2 < 0); --- 2883,2889 ---- UPDATE ft1 SET c2 = -c2 WHERE c1 = 1; -- c2positive ERROR: new row for relation "T 1" violates check constraint "c2positive" DETAIL: Failing row contains (1, -1, 00001_trig_update, 1970-01-02 08:00:00+00, 1970-01-02 00:00:00, 1, 1 , foo). ! CONTEXT: Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (- c2) WHERE (("C 1" = 1)) ALTER FOREIGN TABLE ft1 DROP CONSTRAINT ft1_c2positive; -- But inconsistent check constraints provide inconsistent results ALTER FOREIGN TABLE ft1 ADD CONSTRAINT ft1_c2negative CHECK (c2 < 0); *************** *** 3246,3251 **** NOTICE: NEW: (13,"test triggered !") --- 3276,3474 ---- (0,27) (1 row) + -- cleanup + DROP TRIGGER trig_row_before ON rem1; + DROP TRIGGER trig_row_after ON rem1; + DROP TRIGGER trig_local_before ON loc1; + -- Test update-pushdown functionality + -- Test with statement-level triggers + CREATE TRIGGER trig_stmt_before + BEFORE DELETE OR INSERT OR UPDATE ON rem1 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + QUERY PLAN + ---------------------------------------------------------- + Update on public.rem1 + -> Foreign Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = ''::text + (3 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + QUERY PLAN + --------------------------------------------- + Delete on public.rem1 + -> Foreign Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 + (3 rows) + + DROP TRIGGER trig_stmt_before ON rem1; + CREATE TRIGGER trig_stmt_after + AFTER DELETE OR INSERT OR UPDATE ON rem1 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + QUERY PLAN + ---------------------------------------------------------- + Update on public.rem1 + -> Foreign Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = ''::text + (3 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + QUERY PLAN + --------------------------------------------- + Delete on public.rem1 + -> Foreign Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 + (3 rows) + + DROP TRIGGER trig_stmt_after ON rem1; + -- Test with row-level ON INSERT triggers + CREATE TRIGGER trig_row_before_insert + BEFORE INSERT ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + QUERY PLAN + ---------------------------------------------------------- + Update on public.rem1 + -> Foreign Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = ''::text + (3 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + QUERY PLAN + --------------------------------------------- + Delete on public.rem1 + -> Foreign Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 + (3 rows) + + DROP TRIGGER trig_row_before_insert ON rem1; + CREATE TRIGGER trig_row_after_insert + AFTER INSERT ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + QUERY PLAN + ---------------------------------------------------------- + Update on public.rem1 + -> Foreign Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = ''::text + (3 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + QUERY PLAN + --------------------------------------------- + Delete on public.rem1 + -> Foreign Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 + (3 rows) + + DROP TRIGGER trig_row_after_insert ON rem1; + -- Test with row-level ON UPDATE triggers + CREATE TRIGGER trig_row_before_update + BEFORE UPDATE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can't be pushed down + QUERY PLAN + --------------------------------------------------------------------- + Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = $2 WHERE ctid = $1 + -> Foreign Scan on public.rem1 + Output: f1, ''::text, ctid, rem1.* + Remote SQL: SELECT f1, f2, ctid FROM public.loc1 FOR UPDATE + (5 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + QUERY PLAN + --------------------------------------------- + Delete on public.rem1 + -> Foreign Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 + (3 rows) + + DROP TRIGGER trig_row_before_update ON rem1; + CREATE TRIGGER trig_row_after_update + AFTER UPDATE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can't be pushed down + QUERY PLAN + ------------------------------------------------------------------------------- + Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = $2 WHERE ctid = $1 RETURNING f1, f2 + -> Foreign Scan on public.rem1 + Output: f1, ''::text, ctid, rem1.* + Remote SQL: SELECT f1, f2, ctid FROM public.loc1 FOR UPDATE + (5 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + QUERY PLAN + --------------------------------------------- + Delete on public.rem1 + -> Foreign Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 + (3 rows) + + DROP TRIGGER trig_row_after_update ON rem1; + -- Test with row-level ON DELETE triggers + CREATE TRIGGER trig_row_before_delete + BEFORE DELETE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + QUERY PLAN + ---------------------------------------------------------- + Update on public.rem1 + -> Foreign Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = ''::text + (3 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can't be pushed down + QUERY PLAN + --------------------------------------------------------------------- + Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 WHERE ctid = $1 + -> Foreign Scan on public.rem1 + Output: ctid, rem1.* + Remote SQL: SELECT f1, f2, ctid FROM public.loc1 FOR UPDATE + (5 rows) + + DROP TRIGGER trig_row_before_delete ON rem1; + CREATE TRIGGER trig_row_after_delete + AFTER DELETE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + QUERY PLAN + ---------------------------------------------------------- + Update on public.rem1 + -> Foreign Update on public.rem1 + Remote SQL: UPDATE public.loc1 SET f2 = ''::text + (3 rows) + + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can't be pushed down + QUERY PLAN + ------------------------------------------------------------------------ + Delete on public.rem1 + Remote SQL: DELETE FROM public.loc1 WHERE ctid = $1 RETURNING f1, f2 + -> Foreign Scan on public.rem1 + Output: ctid, rem1.* + Remote SQL: SELECT f1, f2, ctid FROM public.loc1 FOR UPDATE + (5 rows) + + DROP TRIGGER trig_row_after_delete ON rem1; -- =================================================================== -- test inheritance features -- =================================================================== *** a/contrib/postgres_fdw/postgres_fdw.c --- b/contrib/postgres_fdw/postgres_fdw.c *************** *** 57,63 **** PG_MODULE_MAGIC; * planner to executor. Currently we store: * * 1) SELECT statement text to be sent to the remote server ! * 2) Integer list of attribute numbers retrieved by the SELECT * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT statement: --- 57,64 ---- * planner to executor. Currently we store: * * 1) SELECT statement text to be sent to the remote server ! * 2) List of restriction clauses that can be executed remotely ! * 3) Integer list of attribute numbers retrieved by the SELECT * * These items are indexed with the enum FdwScanPrivateIndex, so an item * can be fetched with list_nth(). For example, to get the SELECT statement: *************** *** 67,72 **** enum FdwScanPrivateIndex --- 68,75 ---- { /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, + /* List of restriction clauses that can be executed remotely */ + FdwScanPrivateRemoteConds, /* Integer list of attribute numbers retrieved by the SELECT */ FdwScanPrivateRetrievedAttrs }; *************** *** 94,99 **** enum FdwModifyPrivateIndex --- 97,124 ---- }; /* + * Similarly, this enum describes what's kept in the fdw_private list for + * a ForeignScan node referencing a postgres_fdw foreign table when a DML + * query is pushed down to the remote server. We store: + * + * 1) UPDATE/DELETE statement text to be sent to the remote server + * 2) Boolean flag showing if the remote query has a RETURNING clause + * 3) Integer list of attribute numbers retrieved by RETURNING, if any + * 4) Boolean flag showing if we set the command es_processed + */ + enum FdwDmlPushdownPrivateIndex + { + /* SQL statement to execute remotely (as a String node) */ + FdwDmlPushdownPrivateUpdateSql, + /* has-returning flag (as an integer Value node) */ + FdwDmlPushdownPrivateHasReturning, + /* Integer list of attribute numbers retrieved by RETURNING */ + FdwDmlPushdownPrivateRetrievedAttrs, + /* set-processed flag (as an integer Value node) */ + FdwDmlPushdownPrivateSetProcessed + }; + + /* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState *************** *** 156,161 **** typedef struct PgFdwModifyState --- 181,217 ---- } PgFdwModifyState; /* + * Execution state of a foreign scan using postgres_fdw. + */ + typedef struct PgFdwDmlPushdownState + { + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* extracted fdw_private data */ + char *query; /* text of UPDATE/DELETE command */ + List *retrieved_attrs; /* list of retrieved attribute numbers */ + bool has_returning; /* is there a RETURNING clause? */ + bool set_processed; /* do we set the command es_processed? */ + + /* for remote query execution */ + PGconn *conn; /* connection for the scan */ + int numParams; /* number of parameters passed to query */ + FmgrInfo *param_flinfo; /* output conversion functions for them */ + List *param_exprs; /* executable expressions for param values */ + const char **param_values; /* textual values of query parameters */ + + /* for storing result tuples */ + PGresult *result; /* result for query */ + int num_tuples; /* # of tuples in array */ + int next_tuple; /* index of next one to return */ + + /* working memory contexts */ + MemoryContext batch_cxt; /* context holding current batch of tuples */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ + } PgFdwDmlPushdownState; + + /* * Workspace for analyzing a foreign table. */ typedef struct PgFdwAnalyzeState *************** *** 246,251 **** static TupleTableSlot *postgresExecForeignDelete(EState *estate, --- 302,314 ---- TupleTableSlot *planSlot); static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); + static bool postgresPlanDMLPushdown(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); + static void postgresBeginDMLPushdown(ForeignScanState *node, int eflags); + static TupleTableSlot *postgresIterateDMLPushdown(ForeignScanState *node); + static void postgresEndDMLPushdown(ForeignScanState *node); static int postgresIsForeignRelUpdatable(Relation rel); static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es); *************** *** 254,259 **** static void postgresExplainForeignModify(ModifyTableState *mtstate, --- 317,324 ---- List *fdw_private, int subplan_index, ExplainState *es); + static void postgresExplainDMLPushdown(ForeignScanState *node, + ExplainState *es); static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); *************** *** 288,295 **** static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot); ! static void store_returning_result(PgFdwModifyState *fmstate, ! TupleTableSlot *slot, PGresult *res); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, --- 353,374 ---- static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot); ! static void store_returning_result(TupleTableSlot *slot, ! PGresult *res, ! int row, ! Relation rel, ! AttInMetadata *attinmeta, ! List *retrieved_attrs, ! MemoryContext temp_context); ! static bool dml_is_pushdown_safe(PlannerInfo *root, ! ModifyTable *plan, ! Index resultRelation, ! int subplan_index, ! Relation rel, ! List *targetAttrs); ! static void execute_dml_stmt(ForeignScanState *node); ! static bool relation_has_row_level_triggers(Relation rel, CmdType operation); ! static TupleTableSlot *get_returning_result(ForeignScanState *node); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, *************** *** 331,341 **** postgres_fdw_handler(PG_FUNCTION_ARGS) --- 410,425 ---- routine->ExecForeignUpdate = postgresExecForeignUpdate; routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; + routine->PlanDMLPushdown = postgresPlanDMLPushdown; + routine->BeginDMLPushdown = postgresBeginDMLPushdown; + routine->IterateDMLPushdown = postgresIterateDMLPushdown; + routine->EndDMLPushdown = postgresEndDMLPushdown; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; + routine->ExplainDMLPushdown = postgresExplainDMLPushdown; /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; *************** *** 1067,1073 **** postgresGetForeignPlan(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwScanPrivateIndex, above. */ ! fdw_private = list_make2(makeString(sql.data), retrieved_attrs); /* --- 1151,1158 ---- * Build the fdw_private list that will be available to the executor. * Items in the list must match enum FdwScanPrivateIndex, above. */ ! fdw_private = list_make3(makeString(sql.data), ! remote_conds, retrieved_attrs); /* *************** *** 1363,1375 **** postgresAddForeignUpdateTargets(Query *parsetree, /* * postgresPlanForeignModify * Plan an insert/update/delete operation on a foreign table - * - * Note: currently, the plan tree generated for UPDATE/DELETE will always - * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE) - * and then the ModifyTable node will have to execute individual remote - * UPDATE/DELETE commands. If there are no local conditions or joins - * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING - * and then do nothing at ModifyTable. Room for future optimization ... */ static List * postgresPlanForeignModify(PlannerInfo *root, --- 1448,1453 ---- *************** *** 1644,1650 **** postgresExecForeignInsert(EState *estate, { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); --- 1722,1732 ---- { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(slot, res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); } else n_rows = atoi(PQcmdTuples(res)); *************** *** 1714,1720 **** postgresExecForeignUpdate(EState *estate, { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); --- 1796,1806 ---- { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(slot, res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); } else n_rows = atoi(PQcmdTuples(res)); *************** *** 1784,1790 **** postgresExecForeignDelete(EState *estate, { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(fmstate, slot, res); } else n_rows = atoi(PQcmdTuples(res)); --- 1870,1880 ---- { n_rows = PQntuples(res); if (n_rows > 0) ! store_returning_result(slot, res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); } else n_rows = atoi(PQcmdTuples(res)); *************** *** 1837,1842 **** postgresEndForeignModify(EState *estate, --- 1927,2242 ---- } /* + * postgresPlanDMLPushdown + * Plan to push down an update/delete operation on a foreign table + */ + static bool + postgresPlanDMLPushdown(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index) + { + CmdType operation = plan->operation; + RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); + Relation rel; + StringInfoData sql; + List *targetAttrs = NIL; + List *params_list = NIL; + List *returningList = NIL; + List *retrieved_attrs = NIL; + List *remote_conds; + ForeignScan *fscan; + + if (operation == CMD_INSERT) + return false; + + /* + * Core code already has some lock on each rel being planned, so we can + * use NoLock here. + */ + rel = heap_open(rte->relid, NoLock); + + /* + * In an UPDATE, we transmit only columns that were explicitly targets + * of the UPDATE, so as to avoid unnecessary data transmission. + */ + if (operation == CMD_UPDATE) + { + int col; + + col = -1; + while ((col = bms_next_member(rte->updatedCols, col)) >= 0) + { + /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ + AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; + + if (attno <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + targetAttrs = lappend_int(targetAttrs, attno); + } + } + + /* + * Ok, check to see whether it's safe to push the command down. + */ + if (!dml_is_pushdown_safe(root, plan, + resultRelation, + subplan_index, + rel, targetAttrs)) + { + heap_close(rel, NoLock); + return false; + } + + /* + * Ok, modify subplan so as to execute the pushed-down command. + */ + fscan = (ForeignScan *) list_nth(plan->plans, subplan_index); + + /* + * Extract the baserestrictinfo clauses that can be evaluated remotely. + */ + remote_conds = (List *) list_nth(fscan->fdw_private, + FdwScanPrivateRemoteConds); + + /* + * Extract the relevant RETURNING list if any. + */ + if (plan->returningLists) + returningList = (List *) list_nth(plan->returningLists, subplan_index); + + /* + * Construct the SQL command string. + */ + initStringInfo(&sql); + switch (operation) + { + case CMD_UPDATE: + deparsePushedDownUpdateSql(&sql, root, resultRelation, rel, + ((Plan *) fscan)->targetlist, + targetAttrs, + remote_conds, ¶ms_list, + returningList, &retrieved_attrs); + break; + case CMD_DELETE: + deparsePushedDownDeleteSql(&sql, root, resultRelation, rel, + remote_conds, ¶ms_list, + returningList, &retrieved_attrs); + break; + default: + elog(ERROR, "unexpected operation: %d", (int) operation); + break; + } + + /* + * Update the operation info. + */ + fscan->operation = operation; + + /* + * Update the fdw_exprs list that will be available to the executor. + */ + fscan->fdw_exprs = params_list; + + /* + * Update the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwDmlPushdownPrivateIndex, above. + */ + fscan->fdw_private = list_make4(makeString(sql.data), + makeInteger((retrieved_attrs != NIL)), + retrieved_attrs, + makeInteger(plan->canSetTag)); + + heap_close(rel, NoLock); + return true; + } + + /* + * postgresBeginDMLPushdown + * Initiate a pushed-down update/delete operation on a foreign table + */ + static void + postgresBeginDMLPushdown(ForeignScanState *node, int eflags) + { + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + PgFdwDmlPushdownState *dpstate; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + int numParams; + int i; + ListCell *lc; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* + * We'll save private state in node->fdw_state. + */ + dpstate = (PgFdwDmlPushdownState *) palloc0(sizeof(PgFdwDmlPushdownState)); + node->fdw_state = (void *) dpstate; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + dpstate->rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(dpstate->rel)); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + dpstate->conn = GetConnection(server, user, false); + + /* Initialize state variable */ + dpstate->num_tuples = -1; /* -1 means not set yet */ + + /* Get private info created by planner functions. */ + dpstate->query = strVal(list_nth(fsplan->fdw_private, + FdwDmlPushdownPrivateUpdateSql)); + dpstate->has_returning = intVal(list_nth(fsplan->fdw_private, + FdwDmlPushdownPrivateHasReturning)); + dpstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwDmlPushdownPrivateRetrievedAttrs); + dpstate->set_processed = intVal(list_nth(fsplan->fdw_private, + FdwDmlPushdownPrivateSetProcessed)); + + /* Create contexts for batches of tuples and per-tuple temp workspace. */ + dpstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw tuple data", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + dpstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + + /* Prepare for input conversion of RETURNING results. */ + if (dpstate->has_returning) + dpstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dpstate->rel)); + + /* Prepare for output conversion of parameters used in remote query. */ + numParams = list_length(fsplan->fdw_exprs); + dpstate->numParams = numParams; + dpstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); + + i = 0; + foreach(lc, fsplan->fdw_exprs) + { + Node *param_expr = (Node *) lfirst(lc); + Oid typefnoid; + bool isvarlena; + + getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); + fmgr_info(typefnoid, &dpstate->param_flinfo[i]); + i++; + } + + /* + * Prepare remote-parameter expressions for evaluation. (Note: in + * practice, we expect that all these expressions will be just Params, so + * we could possibly do something more efficient than using the full + * expression-eval machinery for this. But probably there would be little + * benefit, and it'd require postgres_fdw to know more than is desirable + * about Param evaluation.) + */ + dpstate->param_exprs = (List *) + ExecInitExpr((Expr *) fsplan->fdw_exprs, + (PlanState *) node); + + /* + * Allocate buffer for text form of query parameters, if any. + */ + if (numParams > 0) + dpstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); + else + dpstate->param_values = NULL; + } + + /* + * postgresIterateDMLPushdown + * Get the result of a pushed-down update/delete operation on a foreign table + */ + static TupleTableSlot * + postgresIterateDMLPushdown(ForeignScanState *node) + { + PgFdwDmlPushdownState *dpstate = (PgFdwDmlPushdownState *) node->fdw_state; + EState *estate = node->ss.ps.state; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + + /* + * If this is the first call after Begin, execute the statement. + */ + if (dpstate->num_tuples == -1) + execute_dml_stmt(node); + + /* + * If the given query doesn't specify RETURNING, just return an empty slot. + */ + if (!resultRelInfo->ri_projectReturning) + { + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + Instrumentation *instr = node->ss.ps.instrument; + + Assert(!dpstate->has_returning); + + /* Increment the command es_processed count if necessary. */ + if (dpstate->set_processed) + estate->es_processed += dpstate->num_tuples; + + /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */ + if (node->ss.ps.instrument) + instr->ntuples += dpstate->num_tuples; + + return ExecClearTuple(slot); + } + + /* + * Get the next returning result. + */ + return get_returning_result(node); + } + + /* + * postgresEndDMLPushdown + * Finish a pushed-down update/delete operation on a foreign table + */ + static void + postgresEndDMLPushdown(ForeignScanState *node) + { + PgFdwDmlPushdownState *dpstate = (PgFdwDmlPushdownState *) node->fdw_state; + + /* if dpstate is NULL, we are in EXPLAIN; nothing to do */ + if (dpstate == NULL) + return; + + /* Clean up */ + if (dpstate->result) + PQclear(dpstate->result); + + /* Release remote connection */ + ReleaseConnection(dpstate->conn); + dpstate->conn = NULL; + + /* MemoryContexts will be deleted automatically. */ + } + + /* * postgresIsForeignRelUpdatable * Determine whether a foreign table supports INSERT, UPDATE and/or * DELETE. *************** *** 1919,1924 **** postgresExplainForeignModify(ModifyTableState *mtstate, --- 2319,2343 ---- } } + /* + * postgresExplainDMLPushdown + * Produce extra output for EXPLAIN of a ForeignScan for a pushed-down + * update/delete operation on a foreign table + */ + static void + postgresExplainDMLPushdown(ForeignScanState *node, ExplainState *es) + { + List *fdw_private; + char *sql; + + if (es->verbose) + { + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + sql = strVal(list_nth(fdw_private, FdwDmlPushdownPrivateUpdateSql)); + ExplainPropertyText("Remote SQL", sql, es); + } + } + /* * estimate_path_cost_size *************** *** 2510,2527 **** convert_prep_stmt_params(PgFdwModifyState *fmstate, * have PG_TRY blocks to ensure this happens. */ static void ! store_returning_result(PgFdwModifyState *fmstate, ! TupleTableSlot *slot, PGresult *res) { PG_TRY(); { HeapTuple newtup; ! newtup = make_tuple_from_result_row(res, 0, ! fmstate->rel, ! fmstate->attinmeta, ! fmstate->retrieved_attrs, ! fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); } --- 2929,2951 ---- * have PG_TRY blocks to ensure this happens. */ static void ! store_returning_result(TupleTableSlot *slot, ! PGresult *res, ! int row, ! Relation rel, ! AttInMetadata *attinmeta, ! List *retrieved_attrs, ! MemoryContext temp_context) { PG_TRY(); { HeapTuple newtup; ! newtup = make_tuple_from_result_row(res, row, ! rel, ! attinmeta, ! retrieved_attrs, ! temp_context); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); } *************** *** 2535,2540 **** store_returning_result(PgFdwModifyState *fmstate, --- 2959,3183 ---- } /* + * Check to see whether it's safe to push an UPDATE/DELETE command down. + * + * Conditions checked here: + * + * 1. If there are any local joins needed, we mustn't push the command down, + * because that breaks execution of the joins. + * + * 2. If there are any quals that can't be evaluated remotely, we mustn't push + * the command down, because that breaks evaluation of the quals. + * + * 3. If the target relation has any row-level local triggers, we mustn't push + * the command down, because that breaks execution of the triggers. + * + * 4. For UPDATE, if it is unsafe to evaluate on the remote server any + * expressions to assign to the target columns, we can't push the command down. + */ + static bool + dml_is_pushdown_safe(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index, + Relation rel, + List *targetAttrs) + { + CmdType operation = plan->operation; + RelOptInfo *baserel = root->simple_rel_array[resultRelation]; + Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index); + ListCell *lc; + + Assert(operation == CMD_UPDATE || operation == CMD_DELETE); + + /* Check point 1 */ + if (nodeTag(subplan) != T_ForeignScan) + return false; + + /* Check point 2 */ + if (subplan->qual != NIL) + return false; + + /* Check point 3 */ + if (relation_has_row_level_triggers(rel, operation)) + return false; + + /* Check point 4 */ + foreach(lc, targetAttrs) + { + int attnum = lfirst_int(lc); + TargetEntry *tle = get_tle_by_resno(subplan->targetlist, + attnum); + + if (!is_foreign_expr(root, baserel, (Expr *) tle->expr)) + return false; + } + + return true; + } + + /* + * relation_has_row_level_triggers + * Determine if relation has any row-level triggers for a given operation. + */ + static bool + relation_has_row_level_triggers(Relation rel, CmdType operation) + { + bool result = false; + + Assert(rel != NULL); + + if (rel->trigdesc == NULL) + return result; + + switch (operation) + { + case CMD_INSERT: + result = (rel->trigdesc->trig_insert_after_row || + rel->trigdesc->trig_insert_before_row); + break; + case CMD_UPDATE: + result = (rel->trigdesc->trig_update_after_row || + rel->trigdesc->trig_update_before_row); + break; + case CMD_DELETE: + result = (rel->trigdesc->trig_delete_after_row || + rel->trigdesc->trig_delete_before_row); + break; + default: + elog(ERROR, "unexpected operation: %d", (int) operation); + break; + } + return result; + } + + /* + * Execute a pushed-down UPDATE/DELETE statement. + */ + static void + execute_dml_stmt(ForeignScanState *node) + { + PgFdwDmlPushdownState *dpstate = (PgFdwDmlPushdownState *) node->fdw_state; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + int numParams = dpstate->numParams; + const char **values = dpstate->param_values; + + /* + * Construct array of query parameter values in text format. We do the + * conversions in the short-lived per-tuple context, so as not to cause a + * memory leak over repeated scans. + */ + if (numParams > 0) + { + int nestlevel; + MemoryContext oldcontext; + int i; + ListCell *lc; + + oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); + + nestlevel = set_transmission_modes(); + + i = 0; + foreach(lc, dpstate->param_exprs) + { + ExprState *expr_state = (ExprState *) lfirst(lc); + Datum expr_value; + bool isNull; + + /* Evaluate the parameter expression */ + expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL); + + /* + * Get string representation of each parameter value by invoking + * type-specific output function, unless the value is null. + */ + if (isNull) + values[i] = NULL; + else + values[i] = OutputFunctionCall(&dpstate->param_flinfo[i], + expr_value); + i++; + } + + reset_transmission_modes(nestlevel); + + MemoryContextSwitchTo(oldcontext); + } + + /* + * Notice that we pass NULL for paramTypes, thus forcing the remote server + * to infer types for all parameters. Since we explicitly cast every + * parameter (see deparse.c), the "inference" is trivial and will produce + * the desired result. This allows us to avoid assuming that the remote + * server has the same OIDs we do for the parameters' types. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + dpstate->result = PQexecParams(dpstate->conn, dpstate->query, + numParams, NULL, values, NULL, NULL, 0); + if (PQresultStatus(dpstate->result) != + (dpstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, dpstate->result, dpstate->conn, true, + dpstate->query); + + /* Get the number of rows affected. */ + if (dpstate->has_returning) + dpstate->num_tuples = PQntuples(dpstate->result); + else + dpstate->num_tuples = atoi(PQcmdTuples(dpstate->result)); + } + + /* + * Get the result of an UPDATE/DELETE RETURNING. + */ + static TupleTableSlot * + get_returning_result(ForeignScanState *node) + { + PgFdwDmlPushdownState *dpstate = (PgFdwDmlPushdownState *) node->fdw_state; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + EState *estate = node->ss.ps.state; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + MemoryContext oldcontext; + + Assert(resultRelInfo->ri_projectReturning); + + /* If we didn't get any tuples, must be end of data. */ + if (dpstate->next_tuple >= dpstate->num_tuples) + return ExecClearTuple(slot); + + /* Increment the command es_processed count if necessary. */ + if (dpstate->set_processed) + estate->es_processed += 1; + + /* OK, we'll store RETURNING tuples. */ + if (!dpstate->has_returning) + ExecStoreAllNullTuple(slot); + else + { + oldcontext = MemoryContextSwitchTo(dpstate->batch_cxt); + + /* Fetch the next tuple. */ + store_returning_result(slot, + dpstate->result, + dpstate->next_tuple, + dpstate->rel, + dpstate->attinmeta, + dpstate->retrieved_attrs, + dpstate->temp_cxt); + + MemoryContextSwitchTo(oldcontext); + } + dpstate->next_tuple++; + + /* Set tuple for later evaluation of the RETURNING list. */ + resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot; + + return slot; + } + + /* * postgresAnalyzeForeignTable * Test whether analyzing this foreign table is supported */ *** a/contrib/postgres_fdw/postgres_fdw.h --- b/contrib/postgres_fdw/postgres_fdw.h *************** *** 103,112 **** extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root, --- 103,126 ---- Index rtindex, Relation rel, List *targetAttrs, List *returningList, List **retrieved_attrs); + extern void deparsePushedDownUpdateSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *targetlist, + List *targetAttrs, + List *remote_conds, + List **params_list, + List *returningList, + List **retrieved_attrs); extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root, Index rtindex, Relation rel, List *returningList, List **retrieved_attrs); + extern void deparsePushedDownDeleteSql(StringInfo buf, PlannerInfo *root, + Index rtindex, Relation rel, + List *remote_conds, + List **params_list, + List *returningList, + List **retrieved_attrs); extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel); extern void deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs); *** a/contrib/postgres_fdw/sql/postgres_fdw.sql --- b/contrib/postgres_fdw/sql/postgres_fdw.sql *************** *** 399,418 **** INSERT INTO ft2 (c1,c2,c3) SELECT c1+1000,c2+100, c3 || c3 FROM ft2 LIMIT 20; INSERT INTO ft2 (c1,c2,c3) VALUES (1101,201,'aaa'), (1102,202,'bbb'), (1103,203,'ccc') RETURNING *; INSERT INTO ft2 (c1,c2,c3) VALUES (1104,204,'ddd'), (1105,205,'eee'); UPDATE ft2 SET c2 = c2 + 300, c3 = c3 || '_update3' WHERE c1 % 10 = 3; UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *; EXPLAIN (verbose, costs off) UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT ! FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; EXPLAIN (verbose, costs off) ! DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; EXPLAIN (verbose, costs off) ! DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1; -- Test that trigger on remote table works as expected CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$ --- 399,425 ---- INSERT INTO ft2 (c1,c2,c3) VALUES (1101,201,'aaa'), (1102,202,'bbb'), (1103,203,'ccc') RETURNING *; INSERT INTO ft2 (c1,c2,c3) VALUES (1104,204,'ddd'), (1105,205,'eee'); + EXPLAIN (verbose, costs off) + UPDATE ft2 SET c2 = c2 + 300, c3 = c3 || '_update3' WHERE c1 % 10 = 3; -- can be pushed down UPDATE ft2 SET c2 = c2 + 300, c3 = c3 || '_update3' WHERE c1 % 10 = 3; + EXPLAIN (verbose, costs off) + UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *; -- can be pushed down UPDATE ft2 SET c2 = c2 + 400, c3 = c3 || '_update7' WHERE c1 % 10 = 7 RETURNING *; EXPLAIN (verbose, costs off) UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT ! FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; -- can't be pushed down UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', c7 = DEFAULT FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9; EXPLAIN (verbose, costs off) ! DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; -- can be pushed down DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4; EXPLAIN (verbose, costs off) ! DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; -- can't be pushed down DELETE FROM ft2 USING ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 2; SELECT c1,c2,c3,c4 FROM ft2 ORDER BY c1; + EXPLAIN (verbose, costs off) + UPDATE ft2 SET c7 = DEFAULT WHERE c1 % 10 = 0 AND date(c4) = '1970-01-01'::date; -- can't be pushed down + UPDATE ft2 SET c7 = DEFAULT WHERE c1 % 10 = 0 AND date(c4) = '1970-01-01'::date; -- Test that trigger on remote table works as expected CREATE OR REPLACE FUNCTION "S 1".F_BRTRIG() RETURNS trigger AS $$ *************** *** 728,733 **** UPDATE rem1 SET f2 = 'testo'; --- 735,824 ---- -- Test returning a system attribute INSERT INTO rem1(f2) VALUES ('test') RETURNING ctid; + -- cleanup + DROP TRIGGER trig_row_before ON rem1; + DROP TRIGGER trig_row_after ON rem1; + DROP TRIGGER trig_local_before ON loc1; + + + -- Test update-pushdown functionality + + -- Test with statement-level triggers + CREATE TRIGGER trig_stmt_before + BEFORE DELETE OR INSERT OR UPDATE ON rem1 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + DROP TRIGGER trig_stmt_before ON rem1; + + CREATE TRIGGER trig_stmt_after + AFTER DELETE OR INSERT OR UPDATE ON rem1 + FOR EACH STATEMENT EXECUTE PROCEDURE trigger_func(); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + DROP TRIGGER trig_stmt_after ON rem1; + + -- Test with row-level ON INSERT triggers + CREATE TRIGGER trig_row_before_insert + BEFORE INSERT ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + DROP TRIGGER trig_row_before_insert ON rem1; + + CREATE TRIGGER trig_row_after_insert + AFTER INSERT ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + DROP TRIGGER trig_row_after_insert ON rem1; + + -- Test with row-level ON UPDATE triggers + CREATE TRIGGER trig_row_before_update + BEFORE UPDATE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can't be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + DROP TRIGGER trig_row_before_update ON rem1; + + CREATE TRIGGER trig_row_after_update + AFTER UPDATE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can't be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can be pushed down + DROP TRIGGER trig_row_after_update ON rem1; + + -- Test with row-level ON DELETE triggers + CREATE TRIGGER trig_row_before_delete + BEFORE DELETE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can't be pushed down + DROP TRIGGER trig_row_before_delete ON rem1; + + CREATE TRIGGER trig_row_after_delete + AFTER DELETE ON rem1 + FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo'); + EXPLAIN (verbose, costs off) + UPDATE rem1 set f2 = ''; -- can be pushed down + EXPLAIN (verbose, costs off) + DELETE FROM rem1; -- can't be pushed down + DROP TRIGGER trig_row_after_delete ON rem1; + -- =================================================================== -- test inheritance features -- =================================================================== *** a/doc/src/sgml/postgres-fdw.sgml --- b/doc/src/sgml/postgres-fdw.sgml *************** *** 471,476 **** --- 471,485 ---- extension that's listed in the foreign server's extensions option. Operators and functions in such clauses must be IMMUTABLE as well. + For an UPDATE or DELETE query, + postgres_fdw attempts to optimize the query execution by + sending the whole query to the remote server if there are no query + WHERE clauses that cannot be sent to the remote server, + no local joins for the query, or no row-level local BEFORE or + AFTER triggers on the target table. In UPDATE, + expressions to assign to target columns must use only built-in data types, + IMMUTABLE operators, and IMMUTABLE functions, + to reduce the risk of misexecution of the query. *** a/src/backend/commands/explain.c --- b/src/backend/commands/explain.c *************** *** 888,894 **** ExplainNode(PlanState *planstate, List *ancestors, pname = sname = "WorkTable Scan"; break; case T_ForeignScan: ! pname = sname = "Foreign Scan"; break; case T_CustomScan: sname = "Custom Scan"; --- 888,912 ---- pname = sname = "WorkTable Scan"; break; case T_ForeignScan: ! sname = "Foreign Scan"; ! switch (((ForeignScan *) plan)->operation) ! { ! case CMD_SELECT: ! pname = "Foreign Scan"; ! operation = "Select"; ! break; ! case CMD_UPDATE: ! pname = "Foreign Update"; ! operation = "Update"; ! break; ! case CMD_DELETE: ! pname = "Foreign Delete"; ! operation = "Delete"; ! break; ! default: ! pname = "???"; ! break; ! } break; case T_CustomScan: sname = "Custom Scan"; *************** *** 1624,1629 **** show_plan_tlist(PlanState *planstate, List *ancestors, ExplainState *es) --- 1642,1652 ---- return; if (IsA(plan, RecursiveUnion)) return; + /* Likewise for ForeignScan in case of pushed-down UPDATE/DELETE */ + if (IsA(plan, ForeignScan) && + (((ForeignScan *) plan)->operation == CMD_UPDATE || + ((ForeignScan *) plan)->operation == CMD_DELETE)) + return; /* Set up deparsing context */ context = set_deparse_context_planstate(es->deparse_cxt, *************** *** 2212,2219 **** show_foreignscan_info(ForeignScanState *fsstate, ExplainState *es) FdwRoutine *fdwroutine = fsstate->fdwroutine; /* Let the FDW emit whatever fields it wants */ ! if (fdwroutine->ExplainForeignScan != NULL) ! fdwroutine->ExplainForeignScan(fsstate, es); } /* --- 2235,2250 ---- FdwRoutine *fdwroutine = fsstate->fdwroutine; /* Let the FDW emit whatever fields it wants */ ! if (((ForeignScan *) fsstate->ss.ps.plan)->operation != CMD_SELECT) ! { ! if (fdwroutine->ExplainDMLPushdown != NULL) ! fdwroutine->ExplainDMLPushdown(fsstate, es); ! } ! else ! { ! if (fdwroutine->ExplainForeignScan != NULL) ! fdwroutine->ExplainForeignScan(fsstate, es); ! } } /* *************** *** 2599,2606 **** show_modifytable_info(ModifyTableState *mtstate, List *ancestors, } } ! /* Give FDW a chance */ ! if (fdwroutine && fdwroutine->ExplainForeignModify != NULL) { List *fdw_private = (List *) list_nth(node->fdwPrivLists, j); --- 2630,2639 ---- } } ! /* Give FDW a chance if needed */ ! if (!resultRelInfo->ri_FdwPushdown && ! fdwroutine && ! fdwroutine->ExplainForeignModify != NULL) { List *fdw_private = (List *) list_nth(node->fdwPrivLists, j); *** a/src/backend/executor/execMain.c --- b/src/backend/executor/execMain.c *************** *** 1011,1020 **** InitPlan(QueryDesc *queryDesc, int eflags) * CheckValidRowMarkRel. */ void ! CheckValidResultRel(Relation resultRel, CmdType operation) { TriggerDesc *trigDesc = resultRel->trigdesc; FdwRoutine *fdwroutine; switch (resultRel->rd_rel->relkind) { --- 1011,1022 ---- * CheckValidRowMarkRel. */ void ! CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation) { + Relation resultRel = resultRelInfo->ri_RelationDesc; TriggerDesc *trigDesc = resultRel->trigdesc; FdwRoutine *fdwroutine; + bool allow_pushdown; switch (resultRel->rd_rel->relkind) { *************** *** 1083,1096 **** CheckValidResultRel(Relation resultRel, CmdType operation) case RELKIND_FOREIGN_TABLE: /* Okay only if the FDW supports it */ fdwroutine = GetFdwRoutineForRelation(resultRel, false); switch (operation) { case CMD_INSERT: ! if (fdwroutine->ExecForeignInsert == NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot insert into foreign table \"%s\"", ! RelationGetRelationName(resultRel)))); if (fdwroutine->IsForeignRelUpdatable != NULL && (fdwroutine->IsForeignRelUpdatable(resultRel) & (1 << CMD_INSERT)) == 0) ereport(ERROR, --- 1085,1107 ---- case RELKIND_FOREIGN_TABLE: /* Okay only if the FDW supports it */ fdwroutine = GetFdwRoutineForRelation(resultRel, false); + allow_pushdown = ((fdwroutine->BeginDMLPushdown != NULL) && + (fdwroutine->IterateDMLPushdown != NULL) && + (fdwroutine->EndDMLPushdown != NULL)); switch (operation) { case CMD_INSERT: ! if (resultRelInfo->ri_FdwPushdown && !allow_pushdown) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot push down insert on foreign table \"%s\"", ! RelationGetRelationName(resultRel)))); ! if (!resultRelInfo->ri_FdwPushdown && ! fdwroutine->ExecForeignInsert == NULL) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot insert into foreign table \"%s\"", ! RelationGetRelationName(resultRel)))); if (fdwroutine->IsForeignRelUpdatable != NULL && (fdwroutine->IsForeignRelUpdatable(resultRel) & (1 << CMD_INSERT)) == 0) ereport(ERROR, *************** *** 1099,1105 **** CheckValidResultRel(Relation resultRel, CmdType operation) RelationGetRelationName(resultRel)))); break; case CMD_UPDATE: ! if (fdwroutine->ExecForeignUpdate == NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot update foreign table \"%s\"", --- 1110,1122 ---- RelationGetRelationName(resultRel)))); break; case CMD_UPDATE: ! if (resultRelInfo->ri_FdwPushdown && !allow_pushdown) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot push down update on foreign table \"%s\"", ! RelationGetRelationName(resultRel)))); ! if (!resultRelInfo->ri_FdwPushdown && ! fdwroutine->ExecForeignUpdate == NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot update foreign table \"%s\"", *************** *** 1112,1118 **** CheckValidResultRel(Relation resultRel, CmdType operation) RelationGetRelationName(resultRel)))); break; case CMD_DELETE: ! if (fdwroutine->ExecForeignDelete == NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot delete from foreign table \"%s\"", --- 1129,1141 ---- RelationGetRelationName(resultRel)))); break; case CMD_DELETE: ! if (resultRelInfo->ri_FdwPushdown && !allow_pushdown) ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("cannot push down delete on foreign table \"%s\"", ! RelationGetRelationName(resultRel)))); ! if (!resultRelInfo->ri_FdwPushdown && ! fdwroutine->ExecForeignDelete == NULL) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot delete from foreign table \"%s\"", *************** *** 1245,1250 **** InitResultRelInfo(ResultRelInfo *resultRelInfo, --- 1268,1274 ---- else resultRelInfo->ri_FdwRoutine = NULL; resultRelInfo->ri_FdwState = NULL; + resultRelInfo->ri_FdwPushdown = false; resultRelInfo->ri_ConstraintExprs = NULL; resultRelInfo->ri_junkFilter = NULL; resultRelInfo->ri_projectReturning = NULL; *** a/src/backend/executor/nodeForeignscan.c --- b/src/backend/executor/nodeForeignscan.c *************** *** 48,54 **** ForeignNext(ForeignScanState *node) /* Call the Iterate function in short-lived context */ oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); ! slot = node->fdwroutine->IterateForeignScan(node); MemoryContextSwitchTo(oldcontext); /* --- 48,57 ---- /* Call the Iterate function in short-lived context */ oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); ! if (plan->operation != CMD_SELECT) ! slot = node->fdwroutine->IterateDMLPushdown(node); ! else ! slot = node->fdwroutine->IterateForeignScan(node); MemoryContextSwitchTo(oldcontext); /* *************** *** 226,232 **** ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) /* * Tell the FDW to initialize the scan. */ ! fdwroutine->BeginForeignScan(scanstate, eflags); return scanstate; } --- 229,238 ---- /* * Tell the FDW to initialize the scan. */ ! if (node->operation != CMD_SELECT) ! fdwroutine->BeginDMLPushdown(scanstate, eflags); ! else ! fdwroutine->BeginForeignScan(scanstate, eflags); return scanstate; } *************** *** 240,247 **** ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) void ExecEndForeignScan(ForeignScanState *node) { /* Let the FDW shut down */ ! node->fdwroutine->EndForeignScan(node); /* Shut down any outer plan. */ if (outerPlanState(node)) --- 246,258 ---- void ExecEndForeignScan(ForeignScanState *node) { + ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; + /* Let the FDW shut down */ ! if (plan->operation != CMD_SELECT) ! node->fdwroutine->EndDMLPushdown(node); ! else ! node->fdwroutine->EndForeignScan(node); /* Shut down any outer plan. */ if (outerPlanState(node)) *** a/src/backend/executor/nodeModifyTable.c --- b/src/backend/executor/nodeModifyTable.c *************** *** 1357,1362 **** ExecModifyTable(ModifyTableState *node) --- 1357,1383 ---- break; } + if (resultRelInfo->ri_FdwPushdown && resultRelInfo->ri_projectReturning) + { + ProjectionInfo *projectReturning = resultRelInfo->ri_projectReturning; + ExprContext *econtext = projectReturning->pi_exprContext; + + /* + * Reset per-tuple memory context to free any expression evaluation + * storage allocated in the previous cycle. + */ + ResetExprContext(econtext); + + /* Make any needed join variables available to ExecProject */ + econtext->ecxt_outertuple = planSlot; + + /* Compute the RETURNING expressions */ + slot = ExecProject(resultRelInfo->ri_projectReturning, NULL); + + estate->es_result_relation_info = saved_resultRelInfo; + return slot; + } + EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); slot = planSlot; *************** *** 1536,1545 **** ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) { subplan = (Plan *) lfirst(l); /* * Verify result relation is a valid target for the current operation */ ! CheckValidResultRel(resultRelInfo->ri_RelationDesc, operation); /* * If there are indices on the result relation, open them and save --- 1557,1569 ---- { subplan = (Plan *) lfirst(l); + /* Initialize the FdwPushdown flag */ + resultRelInfo->ri_FdwPushdown = list_nth_int(node->fdwPushdowns, i); + /* * Verify result relation is a valid target for the current operation */ ! CheckValidResultRel(resultRelInfo, operation); /* * If there are indices on the result relation, open them and save *************** *** 1560,1566 **** ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags); /* Also let FDWs init themselves for foreign-table result rels */ ! if (resultRelInfo->ri_FdwRoutine != NULL && resultRelInfo->ri_FdwRoutine->BeginForeignModify != NULL) { List *fdw_private = (List *) list_nth(node->fdwPrivLists, i); --- 1584,1591 ---- mtstate->mt_plans[i] = ExecInitNode(subplan, estate, eflags); /* Also let FDWs init themselves for foreign-table result rels */ ! if (!resultRelInfo->ri_FdwPushdown && ! resultRelInfo->ri_FdwRoutine != NULL && resultRelInfo->ri_FdwRoutine->BeginForeignModify != NULL) { List *fdw_private = (List *) list_nth(node->fdwPrivLists, i); *************** *** 1731,1743 **** ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) --- 1756,1777 ---- erm = ExecFindRowMark(estate, rc->rti, false); /* build ExecAuxRowMark for each subplan */ + resultRelInfo = mtstate->resultRelInfo; for (i = 0; i < nplans; i++) { ExecAuxRowMark *aerm; + /* ignore subplan if the FDW pushes the command down */ + if (resultRelInfo->ri_FdwPushdown) + { + resultRelInfo++; + continue; + } + subplan = mtstate->mt_plans[i]->plan; aerm = ExecBuildAuxRowMark(erm, subplan->targetlist); mtstate->mt_arowmarks[i] = lappend(mtstate->mt_arowmarks[i], aerm); + resultRelInfo++; } } *************** *** 1793,1798 **** ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) --- 1827,1839 ---- { JunkFilter *j; + /* ignore subplan if the FDW pushes the command down */ + if (resultRelInfo->ri_FdwPushdown) + { + resultRelInfo++; + continue; + } + subplan = mtstate->mt_plans[i]->plan; if (operation == CMD_INSERT || operation == CMD_UPDATE) ExecCheckPlanOutput(resultRelInfo->ri_RelationDesc, *************** *** 1887,1893 **** ExecEndModifyTable(ModifyTableState *node) { ResultRelInfo *resultRelInfo = node->resultRelInfo + i; ! if (resultRelInfo->ri_FdwRoutine != NULL && resultRelInfo->ri_FdwRoutine->EndForeignModify != NULL) resultRelInfo->ri_FdwRoutine->EndForeignModify(node->ps.state, resultRelInfo); --- 1928,1935 ---- { ResultRelInfo *resultRelInfo = node->resultRelInfo + i; ! if (!resultRelInfo->ri_FdwPushdown && ! resultRelInfo->ri_FdwRoutine != NULL && resultRelInfo->ri_FdwRoutine->EndForeignModify != NULL) resultRelInfo->ri_FdwRoutine->EndForeignModify(node->ps.state, resultRelInfo); *** a/src/backend/nodes/copyfuncs.c --- b/src/backend/nodes/copyfuncs.c *************** *** 186,191 **** _copyModifyTable(const ModifyTable *from) --- 186,192 ---- COPY_NODE_FIELD(withCheckOptionLists); COPY_NODE_FIELD(returningLists); COPY_NODE_FIELD(fdwPrivLists); + COPY_NODE_FIELD(fdwPushdowns); COPY_NODE_FIELD(rowMarks); COPY_SCALAR_FIELD(epqParam); COPY_SCALAR_FIELD(onConflictAction); *************** *** 645,650 **** _copyForeignScan(const ForeignScan *from) --- 646,652 ---- /* * copy remainder of node */ + COPY_SCALAR_FIELD(operation); COPY_SCALAR_FIELD(fs_server); COPY_NODE_FIELD(fdw_exprs); COPY_NODE_FIELD(fdw_private); *** a/src/backend/nodes/outfuncs.c --- b/src/backend/nodes/outfuncs.c *************** *** 340,345 **** _outModifyTable(StringInfo str, const ModifyTable *node) --- 340,346 ---- WRITE_NODE_FIELD(withCheckOptionLists); WRITE_NODE_FIELD(returningLists); WRITE_NODE_FIELD(fdwPrivLists); + WRITE_NODE_FIELD(fdwPushdowns); WRITE_NODE_FIELD(rowMarks); WRITE_INT_FIELD(epqParam); WRITE_ENUM_FIELD(onConflictAction, OnConflictAction); *************** *** 591,596 **** _outForeignScan(StringInfo str, const ForeignScan *node) --- 592,598 ---- _outScanInfo(str, (const Scan *) node); + WRITE_ENUM_FIELD(operation, CmdType); WRITE_OID_FIELD(fs_server); WRITE_NODE_FIELD(fdw_exprs); WRITE_NODE_FIELD(fdw_private); *** a/src/backend/nodes/readfuncs.c --- b/src/backend/nodes/readfuncs.c *************** *** 1471,1476 **** _readModifyTable(void) --- 1471,1477 ---- READ_NODE_FIELD(withCheckOptionLists); READ_NODE_FIELD(returningLists); READ_NODE_FIELD(fdwPrivLists); + READ_NODE_FIELD(fdwPushdowns); READ_NODE_FIELD(rowMarks); READ_INT_FIELD(epqParam); READ_ENUM_FIELD(onConflictAction, OnConflictAction); *** a/src/backend/optimizer/plan/createplan.c --- b/src/backend/optimizer/plan/createplan.c *************** *** 3768,3773 **** make_foreignscan(List *qptlist, --- 3768,3774 ---- plan->lefttree = outer_plan; plan->righttree = NULL; node->scan.scanrelid = scanrelid; + node->operation = CMD_SELECT; /* fs_server will be filled in by create_foreignscan_plan */ node->fs_server = InvalidOid; node->fdw_exprs = fdw_exprs; *************** *** 5043,5048 **** make_modifytable(PlannerInfo *root, --- 5044,5050 ---- Plan *plan = &node->plan; double total_size; List *fdw_private_list; + List *fdwpushdown_list; ListCell *subnode; ListCell *lc; int i; *************** *** 5123,5134 **** make_modifytable(PlannerInfo *root, --- 5125,5138 ---- * construct private plan data, and accumulate it all into a list. */ fdw_private_list = NIL; + fdwpushdown_list = NIL; i = 0; foreach(lc, resultRelations) { Index rti = lfirst_int(lc); FdwRoutine *fdwroutine; List *fdw_private; + bool fdwpushdown; /* * If possible, we want to get the FdwRoutine from our RelOptInfo for *************** *** 5156,5161 **** make_modifytable(PlannerInfo *root, --- 5160,5173 ---- } if (fdwroutine != NULL && + fdwroutine->PlanDMLPushdown != NULL) + fdwpushdown = fdwroutine->PlanDMLPushdown(root, node, rti, i); + else + fdwpushdown = false; + fdwpushdown_list = lappend_int(fdwpushdown_list, fdwpushdown); + + if (!fdwpushdown && + fdwroutine != NULL && fdwroutine->PlanForeignModify != NULL) fdw_private = fdwroutine->PlanForeignModify(root, node, rti, i); else *************** *** 5164,5169 **** make_modifytable(PlannerInfo *root, --- 5176,5182 ---- i++; } node->fdwPrivLists = fdw_private_list; + node->fdwPushdowns = fdwpushdown_list; return node; } *** a/src/include/executor/executor.h --- b/src/include/executor/executor.h *************** *** 184,190 **** extern void ExecutorEnd(QueryDesc *queryDesc); extern void standard_ExecutorEnd(QueryDesc *queryDesc); extern void ExecutorRewind(QueryDesc *queryDesc); extern bool ExecCheckRTPerms(List *rangeTable, bool ereport_on_violation); ! extern void CheckValidResultRel(Relation resultRel, CmdType operation); extern void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, --- 184,190 ---- extern void standard_ExecutorEnd(QueryDesc *queryDesc); extern void ExecutorRewind(QueryDesc *queryDesc); extern bool ExecCheckRTPerms(List *rangeTable, bool ereport_on_violation); ! extern void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation); extern void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, *** a/src/include/foreign/fdwapi.h --- b/src/include/foreign/fdwapi.h *************** *** 91,96 **** typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate, --- 91,108 ---- typedef void (*EndForeignModify_function) (EState *estate, ResultRelInfo *rinfo); + typedef bool (*PlanDMLPushdown_function) (PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); + + typedef void (*BeginDMLPushdown_function) (ForeignScanState *node, + int eflags); + + typedef TupleTableSlot *(*IterateDMLPushdown_function) (ForeignScanState *node); + + typedef void (*EndDMLPushdown_function) (ForeignScanState *node); + typedef int (*IsForeignRelUpdatable_function) (Relation rel); typedef RowMarkType (*GetForeignRowMarkType_function) (RangeTblEntry *rte, *************** *** 110,115 **** typedef void (*ExplainForeignModify_function) (ModifyTableState *mtstate, --- 122,130 ---- int subplan_index, struct ExplainState *es); + typedef void (*ExplainDMLPushdown_function) (ForeignScanState *node, + struct ExplainState *es); + typedef int (*AcquireSampleRowsFunc) (Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, *************** *** 161,166 **** typedef struct FdwRoutine --- 176,185 ---- ExecForeignUpdate_function ExecForeignUpdate; ExecForeignDelete_function ExecForeignDelete; EndForeignModify_function EndForeignModify; + PlanDMLPushdown_function PlanDMLPushdown; + BeginDMLPushdown_function BeginDMLPushdown; + IterateDMLPushdown_function IterateDMLPushdown; + EndDMLPushdown_function EndDMLPushdown; IsForeignRelUpdatable_function IsForeignRelUpdatable; /* Functions for SELECT FOR UPDATE/SHARE row locking */ *************** *** 171,176 **** typedef struct FdwRoutine --- 190,196 ---- /* Support functions for EXPLAIN */ ExplainForeignScan_function ExplainForeignScan; ExplainForeignModify_function ExplainForeignModify; + ExplainDMLPushdown_function ExplainDMLPushdown; /* Support functions for ANALYZE */ AnalyzeForeignTable_function AnalyzeForeignTable; *** a/src/include/nodes/execnodes.h --- b/src/include/nodes/execnodes.h *************** *** 311,316 **** typedef struct JunkFilter --- 311,317 ---- * TrigInstrument optional runtime measurements for triggers * FdwRoutine FDW callback functions, if foreign table * FdwState available to save private state of FDW + * FdwPushdown true when the command is pushed down * WithCheckOptions list of WithCheckOption's to be checked * WithCheckOptionExprs list of WithCheckOption expr states * ConstraintExprs array of constraint-checking expr states *************** *** 334,339 **** typedef struct ResultRelInfo --- 335,341 ---- Instrumentation *ri_TrigInstrument; struct FdwRoutine *ri_FdwRoutine; void *ri_FdwState; + bool ri_FdwPushdown; List *ri_WithCheckOptions; List *ri_WithCheckOptionExprs; List **ri_ConstraintExprs; *** a/src/include/nodes/plannodes.h --- b/src/include/nodes/plannodes.h *************** *** 188,193 **** typedef struct ModifyTable --- 188,194 ---- List *withCheckOptionLists; /* per-target-table WCO lists */ List *returningLists; /* per-target-table RETURNING tlists */ List *fdwPrivLists; /* per-target-table FDW private data lists */ + List *fdwPushdowns; /* per-target-table FDW pushdown flags */ List *rowMarks; /* PlanRowMarks (non-locking only) */ int epqParam; /* ID of Param for EvalPlanQual re-eval */ OnConflictAction onConflictAction; /* ON CONFLICT action */ *************** *** 530,535 **** typedef struct WorkTableScan --- 531,537 ---- typedef struct ForeignScan { Scan scan; + CmdType operation; /* SELECT/UPDATE/DELETE */ Oid fs_server; /* OID of foreign server */ List *fdw_exprs; /* expressions that FDW may evaluate */ List *fdw_private; /* private data for FDW */