From 1baf6a646c3dfd743b049cfc961d35e85fd9063d Mon Sep 17 00:00:00 2001 From: amitlan Date: Tue, 28 Jun 2022 17:15:51 +0900 Subject: [PATCH v1 1/2] Avoid using SPI in RI trigger functions Currently, ri_PlanCheck() uses SPI_prepare() to get an "SPI plan" containing the CachedPlanSource for the SQL query that a given RI trigger function uses to implement an RI check. Furthermore, ri_PerformCheck() calls SPI_execute_snapshot() on the "SPI plan" to execute the query using a given snapshot. This commit invents ri_PlanCreate() and ri_PlanExecute() to take the place of SPI_prepare() and SPI_execute_snapshot() respectively. ri_PlanCreate() will create an "RI plan" for a given query, using a caller-specified (caller of ri_PlanCheck() that is) callback function. For example, the callback ri_SqlStringPlanCreate() will produce a CachedPlanSource for the input SQL string, just as SPI_prepare() would. ri_PlanExecute() will execute the "RI plan" by calling a caller-specific callback function whose pointer is saved within the "RI Plan" data structure (struct RIPlan). For example, the callback ri_SqlStringPlanExecute() will fetch a CachedPlan for given CachedPlanSource found in the "RI plan" and execute its PlannedStmt by invoking the executor, just as SPI_execute_snapshot() would. The details such as which snapshot to use are now fully controlled by ri_PerformCheck, whereas the previous arrangement relied on the SPI logic for snapshot management. By making ri_PlanCreate() and ri_PlanExecute() and the "RI plan" data structure pluggable, it will be possible for the future commits to replace the current SQL string based implementation of some RI checks with something as simple as a C function to directly scan the underlying table/index. NB: RI_Initial_Check() and RI_PartitionRemove_Check() still use the the SPI_prepare()/SPI_execute_snapshot() combination, because I haven't yet added a proper DestReceiver in ri_SqlStringPlanExecute() to receive and process the tuples that the execution would produce, which those RI_* functions will need. --- src/backend/executor/spi.c | 2 +- src/backend/utils/adt/ri_triggers.c | 598 ++++++++++++++++++++++------ 2 files changed, 488 insertions(+), 112 deletions(-) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 29bc26669b..1d5d7d0383 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -762,7 +762,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params, * end of the command. * * This is currently not documented in spi.sgml because it is only intended - * for use by RI triggers. + * for use by some functions in ri_triggers.c. * * Passing snapshot == InvalidSnapshot will select the normal behavior of * fetching a new snapshot for each query. diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 51b3fdc9a0..73b51eea73 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -9,7 +9,7 @@ * across query and transaction boundaries, in fact they live as long as * the backend does. This works because the hashtable structures * themselves are allocated by dynahash.c in its permanent DynaHashCxt, - * and the SPI plans they point to are saved using SPI_keepplan(). + * and the CachedPlanSources they point to are saved in CachedMemoryContext. * There is not currently any provision for throwing away a no-longer-needed * plan --- consider improving this someday. * @@ -40,6 +40,8 @@ #include "parser/parse_coerce.h" #include "parser/parse_relation.h" #include "storage/bufmgr.h" +#include "tcop/pquery.h" +#include "tcop/utility.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" @@ -127,10 +129,55 @@ typedef struct RI_ConstraintInfo dlist_node valid_link; /* Link in list of valid entries */ } RI_ConstraintInfo; +/* RI plan callback functions */ +struct RI_Plan; +typedef void (*RI_PlanCreateFunc_type) (struct RI_Plan *plan, const char *querystr, int nargs, Oid *paramtypes); +typedef int (*RI_PlanExecFunc_type) (struct RI_Plan *plan, Relation fk_rel, Relation pk_rel, + Datum *param_vals, char *params_isnulls, + Snapshot test_snapshot, Snapshot crosscheck_snapshot, + int limit, CmdType *last_stmt_cmdtype); +typedef bool (*RI_PlanIsValidFunc_type) (struct RI_Plan *plan); +typedef void (*RI_PlanFreeFunc_type) (struct RI_Plan *plan); + +/* + * RI_Plan + * + * Information related to the implementation of a plan for a given RI query. + * ri_PlanCheck() makes and stores these in ri_query_cache. The callers of + * ri_PlanCheck() specify a RI_PlanCreateFunc_type function to fill in the + * caller-specific implementation details such as the callback functions + * to create, validate, free a plan, and also the arguments necessary for + * the execution of the plan. + */ +typedef struct RI_Plan +{ + /* + * Context under which this struct and its subsidiary data gets allocated. + * It is made a child of CacheMemoryContext. + */ + MemoryContext plancxt; + + /* Query parameter types. */ + int nargs; + Oid *paramtypes; + + /* + * Set of functions specified by a RI trigger function to implement + * the plan for the trigger's RI query. + */ + RI_PlanExecFunc_type plan_exec_func; /* execute the plan */ + void *plan_exec_arg; /* execution argument, such as + * a List of CachedPlanSource */ + RI_PlanIsValidFunc_type plan_is_valid_func; /* check if the plan still + * valid for ri_query_cache + * to continue caching it */ + RI_PlanFreeFunc_type plan_free_func; /* release plan resources */ +} RI_Plan; + /* * RI_QueryKey * - * The key identifying a prepared SPI plan in our query hashtable + * The key identifying a plan in our query hashtable */ typedef struct RI_QueryKey { @@ -144,7 +191,7 @@ typedef struct RI_QueryKey typedef struct RI_QueryHashEntry { RI_QueryKey key; - SPIPlanPtr plan; + RI_Plan *plan; } RI_QueryHashEntry; /* @@ -208,8 +255,8 @@ static bool ri_AttributesEqual(Oid eq_opr, Oid typeid, static void ri_InitHashTables(void); static void InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue); -static SPIPlanPtr ri_FetchPreparedPlan(RI_QueryKey *key); -static void ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan); +static RI_Plan *ri_FetchPreparedPlan(RI_QueryKey *key); +static void ri_HashPreparedPlan(RI_QueryKey *key, RI_Plan *plan); static RI_CompareHashEntry *ri_HashCompareOp(Oid eq_opr, Oid typeid); static void ri_CheckTrigger(FunctionCallInfo fcinfo, const char *funcname, @@ -218,13 +265,14 @@ static const RI_ConstraintInfo *ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk); static const RI_ConstraintInfo *ri_LoadConstraintInfo(Oid constraintOid); static Oid get_ri_constraint_root(Oid constrOid); -static SPIPlanPtr ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes, - RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel); +static RI_Plan *ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func, + const char *querystr, int nargs, Oid *argtypes, + RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel); static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo, - RI_QueryKey *qkey, SPIPlanPtr qplan, + RI_QueryKey *qkey, RI_Plan *qplan, Relation fk_rel, Relation pk_rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool detectNewRows, int expect_OK); + bool detectNewRows, int expected_cmdtype); static void ri_ExtractValues(Relation rel, TupleTableSlot *slot, const RI_ConstraintInfo *riinfo, bool rel_is_pk, Datum *vals, char *nulls); @@ -232,6 +280,15 @@ static void ri_ReportViolation(const RI_ConstraintInfo *riinfo, Relation pk_rel, Relation fk_rel, TupleTableSlot *violatorslot, TupleDesc tupdesc, int queryno, bool partgone) pg_attribute_noreturn(); +static void ri_SqlStringPlanCreate(RI_Plan *plan, + const char *querystr, int nargs, Oid *paramtypes); +static bool ri_SqlStringPlanIsValid(RI_Plan *plan); +static int ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel, + Datum *vals, char *nulls, + Snapshot test_snapshot, + Snapshot crosscheck_snapshot, + int limit, CmdType *last_stmt_cmdtype); +static void ri_SqlStringPlanFree(RI_Plan *plan); /* @@ -247,7 +304,7 @@ RI_FKey_check(TriggerData *trigdata) Relation pk_rel; TupleTableSlot *newslot; RI_QueryKey qkey; - SPIPlanPtr qplan; + RI_Plan *qplan; riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger, trigdata->tg_relation, false); @@ -344,9 +401,6 @@ RI_FKey_check(TriggerData *trigdata) break; } - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - /* Fetch or prepare a saved plan for the real check */ ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK); @@ -392,8 +446,9 @@ RI_FKey_check(TriggerData *trigdata) } appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, + /* Prepare and save the plan using ri_SqlStringPlanCreate(). */ + qplan = ri_PlanCheck(ri_SqlStringPlanCreate, + querybuf.data, riinfo->nkeys, queryoids, &qkey, fk_rel, pk_rel); } @@ -408,10 +463,7 @@ RI_FKey_check(TriggerData *trigdata) fk_rel, pk_rel, NULL, newslot, pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE, - SPI_OK_SELECT); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); + CMD_SELECT); table_close(pk_rel, RowShareLock); @@ -466,16 +518,13 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, TupleTableSlot *oldslot, const RI_ConstraintInfo *riinfo) { - SPIPlanPtr qplan; + RI_Plan *qplan; RI_QueryKey qkey; bool result; /* Only called for non-null rows */ Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL); - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - /* * Fetch or prepare a saved plan for checking PK table with values coming * from a PK row @@ -523,8 +572,9 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, } appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, + /* Prepare and save the plan using ri_SqlStringPlanCreate(). */ + qplan = ri_PlanCheck(ri_SqlStringPlanCreate, + querybuf.data, riinfo->nkeys, queryoids, &qkey, fk_rel, pk_rel); } @@ -535,10 +585,7 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, fk_rel, pk_rel, oldslot, NULL, true, /* treat like update */ - SPI_OK_SELECT); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); + CMD_SELECT); return result; } @@ -632,7 +679,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action) Relation pk_rel; TupleTableSlot *oldslot; RI_QueryKey qkey; - SPIPlanPtr qplan; + RI_Plan *qplan; riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger, trigdata->tg_relation, true); @@ -660,9 +707,6 @@ ri_restrict(TriggerData *trigdata, bool is_no_action) return PointerGetDatum(NULL); } - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - /* * Fetch or prepare a saved plan for the restrict lookup (it's the same * query for delete and update cases) @@ -715,8 +759,9 @@ ri_restrict(TriggerData *trigdata, bool is_no_action) } appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, + /* Prepare and save the plan using ri_SqlStringPlanCreate(). */ + qplan = ri_PlanCheck(ri_SqlStringPlanCreate, + querybuf.data, riinfo->nkeys, queryoids, &qkey, fk_rel, pk_rel); } @@ -727,10 +772,7 @@ ri_restrict(TriggerData *trigdata, bool is_no_action) fk_rel, pk_rel, oldslot, NULL, true, /* must detect new rows */ - SPI_OK_SELECT); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); + CMD_SELECT); table_close(fk_rel, RowShareLock); @@ -752,7 +794,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS) Relation pk_rel; TupleTableSlot *oldslot; RI_QueryKey qkey; - SPIPlanPtr qplan; + RI_Plan *qplan; /* Check that this is a valid trigger call on the right time and event. */ ri_CheckTrigger(fcinfo, "RI_FKey_cascade_del", RI_TRIGTYPE_DELETE); @@ -770,9 +812,6 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS) pk_rel = trigdata->tg_relation; oldslot = trigdata->tg_trigslot; - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - /* Fetch or prepare a saved plan for the cascaded delete */ ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_ONDELETE); @@ -820,8 +859,9 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS) queryoids[i] = pk_type; } - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, + /* Prepare and save the plan using ri_SqlStringPlanCreate(). */ + qplan = ri_PlanCheck(ri_SqlStringPlanCreate, + querybuf.data, riinfo->nkeys, queryoids, &qkey, fk_rel, pk_rel); } @@ -833,10 +873,7 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS) fk_rel, pk_rel, oldslot, NULL, true, /* must detect new rows */ - SPI_OK_DELETE); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); + CMD_DELETE); table_close(fk_rel, RowExclusiveLock); @@ -859,7 +896,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS) TupleTableSlot *newslot; TupleTableSlot *oldslot; RI_QueryKey qkey; - SPIPlanPtr qplan; + RI_Plan *qplan; /* Check that this is a valid trigger call on the right time and event. */ ri_CheckTrigger(fcinfo, "RI_FKey_cascade_upd", RI_TRIGTYPE_UPDATE); @@ -879,9 +916,6 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS) newslot = trigdata->tg_newslot; oldslot = trigdata->tg_trigslot; - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - /* Fetch or prepare a saved plan for the cascaded update */ ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CASCADE_ONUPDATE); @@ -942,8 +976,9 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS) } appendBinaryStringInfo(&querybuf, qualbuf.data, qualbuf.len); - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys * 2, queryoids, + /* Prepare and save the plan using ri_SqlStringPlanCreate(). */ + qplan = ri_PlanCheck(ri_SqlStringPlanCreate, + querybuf.data, riinfo->nkeys * 2, queryoids, &qkey, fk_rel, pk_rel); } @@ -954,10 +989,7 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS) fk_rel, pk_rel, oldslot, newslot, true, /* must detect new rows */ - SPI_OK_UPDATE); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); + CMD_UPDATE); table_close(fk_rel, RowExclusiveLock); @@ -1039,7 +1071,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind) Relation pk_rel; TupleTableSlot *oldslot; RI_QueryKey qkey; - SPIPlanPtr qplan; + RI_Plan *qplan; int32 queryno; riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger, @@ -1055,9 +1087,6 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind) pk_rel = trigdata->tg_relation; oldslot = trigdata->tg_trigslot; - if (SPI_connect() != SPI_OK_CONNECT) - elog(ERROR, "SPI_connect failed"); - /* * Fetch or prepare a saved plan for the trigger. */ @@ -1174,8 +1203,9 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind) queryoids[i] = pk_type; } - /* Prepare and save the plan */ - qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids, + /* Prepare and save the plan using ri_SqlStringPlanCreate(). */ + qplan = ri_PlanCheck(ri_SqlStringPlanCreate, + querybuf.data, riinfo->nkeys, queryoids, &qkey, fk_rel, pk_rel); } @@ -1186,10 +1216,7 @@ ri_set(TriggerData *trigdata, bool is_set_null, int tgkind) fk_rel, pk_rel, oldslot, NULL, true, /* must detect new rows */ - SPI_OK_UPDATE); - - if (SPI_finish() != SPI_OK_FINISH) - elog(ERROR, "SPI_finish failed"); + CMD_UPDATE); table_close(fk_rel, RowExclusiveLock); @@ -1382,7 +1409,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) int save_nestlevel; char workmembuf[32]; int spi_result; - SPIPlanPtr qplan; + SPIPlanPtr qplan; riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false); @@ -1963,7 +1990,7 @@ ri_GenerateQualCollation(StringInfo buf, Oid collation) /* ---------- * ri_BuildQueryKey - * - * Construct a hashtable key for a prepared SPI plan of an FK constraint. + * Construct a hashtable key for a plan of an FK constraint. * * key: output argument, *key is filled in based on the other arguments * riinfo: info derived from pg_constraint entry @@ -1982,9 +2009,9 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo, * the FK constraint (i.e., not the table on which the trigger has been * fired), and so it will be the same for all members of the inheritance * tree. So we may use the root constraint's OID in the hash key, rather - * than the constraint's own OID. This avoids creating duplicate SPI - * plans, saving lots of work and memory when there are many partitions - * with similar FK constraints. + * than the constraint's own OID. This avoids creating duplicate plans, + * saving lots of work and memory when there are many partitions with + * similar FK constraints. * * (Note that we must still have a separate RI_ConstraintInfo for each * constraint, because partitions can have different column orders, @@ -2258,15 +2285,366 @@ InvalidateConstraintCacheCallBack(Datum arg, int cacheid, uint32 hashvalue) } } +/* Query string or an equivalent name to show in the error CONTEXT. */ +typedef struct RIErrorCallbackArg +{ + const char *query; +} RIErrorCallbackArg; + +/* + * _RI_error_callback + * + * Add context information when a query processed with ri_CreatePlan() or + * ri_PlanExecute() fails. + */ +static void +_RI_error_callback(void *arg) +{ + RIErrorCallbackArg *carg = (RIErrorCallbackArg *) arg; + const char *query = carg->query; + int syntaxerrposition; + + Assert(query != NULL); + + /* + * If there is a syntax error position, convert to internal syntax error; + * otherwise treat the query as an item of context stack + */ + syntaxerrposition = geterrposition(); + if (syntaxerrposition > 0) + { + errposition(0); + internalerrposition(syntaxerrposition); + internalerrquery(query); + } + else + errcontext("SQL statement \"%s\"", query); +} + +/* + * This creates a plan for a query written in SQL. + * + * The main product is a list of CachedPlanSource for each of the queries + * resulting from the provided query's rewrite that is saved to + * plan->plan_exec_arg. + */ +static void +ri_SqlStringPlanCreate(RI_Plan *plan, + const char *querystr, int nargs, Oid *paramtypes) +{ + List *raw_parsetree_list; + List *plancache_list = NIL; + ListCell *list_item; + RIErrorCallbackArg ricallbackarg; + ErrorContextCallback rierrcontext; + + Assert(querystr != NULL); + + /* + * Setup error traceback support for ereport() + */ + ricallbackarg.query = querystr; + rierrcontext.callback = _RI_error_callback; + rierrcontext.arg = &ricallbackarg; + rierrcontext.previous = error_context_stack; + error_context_stack = &rierrcontext; + + /* + * Parse the request string into a list of raw parse trees. + */ + raw_parsetree_list = raw_parser(querystr, RAW_PARSE_DEFAULT); + + /* + * Do parse analysis and rule rewrite for each raw parsetree, storing the + * results into unsaved plancache entries. + */ + plancache_list = NIL; + + foreach(list_item, raw_parsetree_list) + { + RawStmt *parsetree = lfirst_node(RawStmt, list_item); + List *stmt_list; + CachedPlanSource *plansource; + + /* + * Create the CachedPlanSource before we do parse analysis, since it + * needs to see the unmodified raw parse tree. + */ + plansource = CreateCachedPlan(parsetree, querystr, + CreateCommandTag(parsetree->stmt)); + + stmt_list = pg_analyze_and_rewrite_fixedparams(parsetree, querystr, + paramtypes, nargs, + NULL); + + /* Finish filling in the CachedPlanSource */ + CompleteCachedPlan(plansource, + stmt_list, + NULL, + paramtypes, nargs, + NULL, NULL, 0, + false); /* not fixed result */ + + SaveCachedPlan(plansource); + plancache_list = lappend(plancache_list, plansource); + } + + plan->plan_exec_func = ri_SqlStringPlanExecute; + plan->plan_exec_arg = (void *) plancache_list; + plan->plan_is_valid_func = ri_SqlStringPlanIsValid; + plan->plan_free_func = ri_SqlStringPlanFree; + + /* + * Pop the error context stack + */ + error_context_stack = rierrcontext.previous; +} + +/* + * This executes the plan after creating a CachedPlan for each + * CachedPlanSource found stored in plan->plan_exec_arg using given + * parameter values. + * + * Return value is the number of tuples returned by the "last" CachedPlan. + */ +static int +ri_SqlStringPlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel, + Datum *param_vals, char *param_isnulls, + Snapshot test_snapshot, + Snapshot crosscheck_snapshot, + int limit, CmdType *last_stmt_cmdtype) +{ + List *plancache_list = (List *) plan->plan_exec_arg; + ListCell *lc; + CachedPlan *cplan; + ResourceOwner plan_owner; + int tuples_processed; + ParamListInfo paramLI; + RIErrorCallbackArg ricallbackarg; + ErrorContextCallback rierrcontext; + + /* + * Setup error traceback support for ereport() + */ + ricallbackarg.query = NULL; /* will be filled below */ + rierrcontext.callback = _RI_error_callback; + rierrcontext.arg = &ricallbackarg; + rierrcontext.previous = error_context_stack; + error_context_stack = &rierrcontext; + + /* + * Convert the parameters into a format that the planner and the executor + * expect them to be in. + */ + if (plan->nargs > 0) + { + paramLI = makeParamList(plan->nargs); + + for (int i = 0; i < plan->nargs; i++) + { + ParamExternData *prm = ¶mLI->params[i]; + + prm->value = param_vals[i]; + prm->isnull = (param_isnulls && param_isnulls[i] == 'n'); + prm->pflags = PARAM_FLAG_CONST; + prm->ptype = plan->paramtypes[i]; + } + } + else + paramLI = NULL; + + plan_owner = CurrentResourceOwner; /* XXX - why? */ + foreach(lc, plancache_list) + { + CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc); + List *stmt_list; + ListCell *lc2; + + ricallbackarg.query = plansource->query_string; + + /* + * Replan if needed, and increment plan refcount. If it's a saved + * plan, the refcount must be backed by the plan_owner. + */ + cplan = GetCachedPlan(plansource, paramLI, plan_owner, NULL); + + stmt_list = cplan->stmt_list; + + foreach(lc2, stmt_list) + { + PlannedStmt *stmt = lfirst_node(PlannedStmt, lc2); + DestReceiver *dest; + QueryDesc *qdesc; + int eflags; + + *last_stmt_cmdtype = stmt->commandType; + + /* + * Advance the command counter before each command and update the + * snapshot. + */ + CommandCounterIncrement(); + UpdateActiveSnapshotCommandId(); + + dest = CreateDestReceiver(DestNone); + qdesc = CreateQueryDesc(stmt, plansource->query_string, + test_snapshot, crosscheck_snapshot, + dest, paramLI, NULL, 0); + + /* Select execution options */ + eflags = EXEC_FLAG_SKIP_TRIGGERS; + ExecutorStart(qdesc, eflags); + ExecutorRun(qdesc, ForwardScanDirection, limit, true); + /* We return the last executed statement's value. */ + tuples_processed = qdesc->estate->es_processed; + ExecutorFinish(qdesc); + ExecutorEnd(qdesc); + FreeQueryDesc(qdesc); + } + + /* Done with this plan, so release refcount */ + ReleaseCachedPlan(cplan, CurrentResourceOwner); + cplan = NULL; + } + + /* We no longer need the cached plan refcount, if any */ + if (cplan) + ReleaseCachedPlan(cplan, plan_owner); + + /* + * Pop the error context stack + */ + error_context_stack = rierrcontext.previous; + + return tuples_processed; +} + +/* + * Have any of the CachedPlanSources been invalidated since being created? + */ +static bool +ri_SqlStringPlanIsValid(RI_Plan *plan) +{ + List *plancache_list = (List *) plan->plan_exec_arg; + ListCell *lc; + + foreach(lc, plancache_list) + { + CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc); + + if (!CachedPlanIsValid(plansource)) + return false; + } + return true; +} + +/* Release CachedPlanSources and associated CachedPlans if any.*/ +static void +ri_SqlStringPlanFree(RI_Plan *plan) +{ + List *plancache_list = (List *) plan->plan_exec_arg; + ListCell *lc; + + foreach(lc, plancache_list) + { + CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc); + + DropCachedPlan(plansource); + } +} + +/* + * Create an RI_Plan for a given RI check query and initialize the + * plan callbacks and execution argument using the caller specified + * function. + */ +static RI_Plan * +ri_PlanCreate(RI_PlanCreateFunc_type plan_create_func, + const char *querystr, int nargs, Oid *paramtypes) +{ + RI_Plan *plan; + MemoryContext plancxt, + oldcxt; + + /* + * Create a memory context for the plan underneath CurrentMemoryContext, + * which is reparented later to be underneath CacheMemoryContext; + */ + plancxt = AllocSetContextCreate(CurrentMemoryContext, + "RI Plan", + ALLOCSET_SMALL_SIZES); + oldcxt = MemoryContextSwitchTo(plancxt); + plan = (RI_Plan *) palloc0(sizeof(*plan)); + plan->plancxt = plancxt; + plan->nargs = nargs; + if (plan->nargs > 0) + { + plan->paramtypes = (Oid *) palloc(plan->nargs * sizeof(Oid)); + memcpy(plan->paramtypes, paramtypes, plan->nargs * sizeof(Oid)); + } + + plan_create_func(plan, querystr, nargs, paramtypes); + + MemoryContextSetParent(plan->plancxt, CacheMemoryContext); + MemoryContextSwitchTo(oldcxt); + + return plan; +} + +/* + * Execute the plan by calling plan_exec_func(). + * + * Returns the number of tuples obtained by executing the plan; the caller + * typically wants to checks if at least 1 row was returned. + * + * *last_stmt_cmdtype is set to the CmdType of the last operation performed + * by executing the plan, which may consist of more than 1 executable + * statements if, for example, any rules belonging to the tables mentioned in + * the original query added additional operations. + */ +static int +ri_PlanExecute(RI_Plan *plan, Relation fk_rel, Relation pk_rel, + Datum *param_vals, char *param_isnulls, + Snapshot test_snapshot, Snapshot crosscheck_snapshot, + int limit, CmdType *last_stmt_cmdtype) +{ + Assert(test_snapshot != NULL && ActiveSnapshotSet()); + return plan->plan_exec_func(plan, fk_rel, pk_rel, + param_vals, param_isnulls, + test_snapshot, + crosscheck_snapshot, + limit, last_stmt_cmdtype); +} + +/* + * Is the plan still valid to continue caching? + */ +static bool +ri_PlanIsValid(RI_Plan *plan) +{ + return plan->plan_is_valid_func(plan); +} + +/* Release plan resources. */ +static void +ri_FreePlan(RI_Plan *plan) +{ + /* First call the implementation specific release function. */ + plan->plan_free_func(plan); + + /* Now get rid of the RI_plan and subsidiary data in its plancxt */ + MemoryContextDelete(plan->plancxt); +} /* * Prepare execution plan for a query to enforce an RI restriction */ -static SPIPlanPtr -ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes, +static RI_Plan * +ri_PlanCheck(RI_PlanCreateFunc_type plan_create_func, + const char *querystr, int nargs, Oid *argtypes, RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel) { - SPIPlanPtr qplan; + RI_Plan *qplan; Relation query_rel; Oid save_userid; int save_sec_context; @@ -2285,18 +2663,12 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes, SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner, save_sec_context | SECURITY_LOCAL_USERID_CHANGE | SECURITY_NOFORCE_RLS); - /* Create the plan */ - qplan = SPI_prepare(querystr, nargs, argtypes); - - if (qplan == NULL) - elog(ERROR, "SPI_prepare returned %s for %s", SPI_result_code_string(SPI_result), querystr); + qplan = ri_PlanCreate(plan_create_func, querystr, nargs, argtypes); /* Restore UID and security context */ SetUserIdAndSecContext(save_userid, save_sec_context); - /* Save the plan */ - SPI_keepplan(qplan); ri_HashPreparedPlan(qkey, qplan); return qplan; @@ -2307,10 +2679,10 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes, */ static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo, - RI_QueryKey *qkey, SPIPlanPtr qplan, + RI_QueryKey *qkey, RI_Plan *qplan, Relation fk_rel, Relation pk_rel, TupleTableSlot *oldslot, TupleTableSlot *newslot, - bool detectNewRows, int expect_OK) + bool detectNewRows, int expected_cmdtype) { Relation query_rel, source_rel; @@ -2318,11 +2690,12 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, Snapshot test_snapshot; Snapshot crosscheck_snapshot; int limit; - int spi_result; + int tuples_processed; Oid save_userid; int save_sec_context; Datum vals[RI_MAX_NUMKEYS * 2]; char nulls[RI_MAX_NUMKEYS * 2]; + CmdType last_stmt_cmdtype; /* * Use the query type code to determine whether the query is run against @@ -2373,30 +2746,36 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, * the caller passes detectNewRows == false then it's okay to do the query * with the transaction snapshot; otherwise we use a current snapshot, and * tell the executor to error out if it finds any rows under the current - * snapshot that wouldn't be visible per the transaction snapshot. Note - * that SPI_execute_snapshot will register the snapshots, so we don't need - * to bother here. + * snapshot that wouldn't be visible per the transaction snapshot. + * + * Also push the chosen snapshot so that anyplace that wants to use it + * can get it by calling GetActiveSnapshot(). */ if (IsolationUsesXactSnapshot() && detectNewRows) { - CommandCounterIncrement(); /* be sure all my own work is visible */ test_snapshot = GetLatestSnapshot(); crosscheck_snapshot = GetTransactionSnapshot(); + /* Make sure we have a private copy of the snapshot to modify. */ + PushCopiedSnapshot(test_snapshot); } else { - /* the default SPI behavior is okay */ - test_snapshot = InvalidSnapshot; + test_snapshot = GetTransactionSnapshot(); crosscheck_snapshot = InvalidSnapshot; + PushActiveSnapshot(test_snapshot); } + /* Also advance the command counter and update the snapshot. */ + CommandCounterIncrement(); + UpdateActiveSnapshotCommandId(); + /* * If this is a select query (e.g., for a 'no action' or 'restrict' * trigger), we only need to see if there is a single row in the table, * matching the key. Otherwise, limit = 0 - because we want the query to * affect ALL the matching rows. */ - limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0; + limit = (expected_cmdtype == CMD_SELECT) ? 1 : 0; /* Switch to proper UID to perform check as */ GetUserIdAndSecContext(&save_userid, &save_sec_context); @@ -2405,19 +2784,16 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, SECURITY_NOFORCE_RLS); /* Finally we can run the query. */ - spi_result = SPI_execute_snapshot(qplan, - vals, nulls, + tuples_processed = ri_PlanExecute(qplan, fk_rel, pk_rel, vals, nulls, test_snapshot, crosscheck_snapshot, - false, false, limit); + limit, &last_stmt_cmdtype); /* Restore UID and security context */ SetUserIdAndSecContext(save_userid, save_sec_context); - /* Check result */ - if (spi_result < 0) - elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result)); + PopActiveSnapshot(); - if (expect_OK >= 0 && spi_result != expect_OK) + if (last_stmt_cmdtype != expected_cmdtype) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("referential integrity query on \"%s\" from constraint \"%s\" on \"%s\" gave unexpected result", @@ -2428,15 +2804,15 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo, /* XXX wouldn't it be clearer to do this part at the caller? */ if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK && - expect_OK == SPI_OK_SELECT && - (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)) + expected_cmdtype == CMD_SELECT && + (tuples_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)) ri_ReportViolation(riinfo, pk_rel, fk_rel, newslot ? newslot : oldslot, NULL, qkey->constr_queryno, false); - return SPI_processed != 0; + return tuples_processed != 0; } /* @@ -2699,14 +3075,14 @@ ri_InitHashTables(void) /* * ri_FetchPreparedPlan - * - * Lookup for a query key in our private hash table of prepared - * and saved SPI execution plans. Return the plan if found or NULL. + * Lookup for a query key in our private hash table of saved RI plans. + * Return the plan if found or NULL. */ -static SPIPlanPtr +static RI_Plan * ri_FetchPreparedPlan(RI_QueryKey *key) { RI_QueryHashEntry *entry; - SPIPlanPtr plan; + RI_Plan *plan; /* * On the first call initialize the hashtable @@ -2734,7 +3110,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key) * locked both FK and PK rels. */ plan = entry->plan; - if (plan && SPI_plan_is_valid(plan)) + if (plan && ri_PlanIsValid(plan)) return plan; /* @@ -2743,7 +3119,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key) */ entry->plan = NULL; if (plan) - SPI_freeplan(plan); + ri_FreePlan(plan); return NULL; } @@ -2755,7 +3131,7 @@ ri_FetchPreparedPlan(RI_QueryKey *key) * Add another plan to our private SPI query plan hashtable. */ static void -ri_HashPreparedPlan(RI_QueryKey *key, SPIPlanPtr plan) +ri_HashPreparedPlan(RI_QueryKey *key, RI_Plan *plan) { RI_QueryHashEntry *entry; bool found; -- 2.35.3