diff --git a/src/backend/catalog/information_schema.sql b/src/backend/catalog/information_schema.sql new file mode 100644 index 4bd942f..440d848 *** a/src/backend/catalog/information_schema.sql --- b/src/backend/catalog/information_schema.sql *************** CREATE VIEW views AS *** 2497,2510 **** CAST('NONE' AS character_data) AS check_option, CAST( ! CASE WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = '2' AND is_instead) ! AND EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = '4' AND is_instead) ! THEN 'YES' ELSE 'NO' END AS yes_or_no) AS is_updatable, CAST( ! CASE WHEN EXISTS (SELECT 1 FROM pg_rewrite WHERE ev_class = c.oid AND ev_type = '3' AND is_instead) ! THEN 'YES' ELSE 'NO' END AS yes_or_no) AS is_insertable_into, CAST( --- 2497,2507 ---- CAST('NONE' AS character_data) AS check_option, CAST( ! CASE WHEN pg_is_view_updatable(c.oid) THEN 'YES' ELSE 'NO' END AS yes_or_no) AS is_updatable, CAST( ! CASE WHEN pg_is_view_insertable(c.oid) THEN 'YES' ELSE 'NO' END AS yes_or_no) AS is_insertable_into, CAST( diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c new file mode 100644 index f34f704..f4cb6dc *** a/src/backend/nodes/copyfuncs.c --- b/src/backend/nodes/copyfuncs.c *************** _copyRangeTblEntry(const RangeTblEntry * *** 1994,1999 **** --- 1994,2000 ---- COPY_SCALAR_FIELD(checkAsUser); COPY_BITMAPSET_FIELD(selectedCols); COPY_BITMAPSET_FIELD(modifiedCols); + COPY_NODE_FIELD(securityQuals); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c new file mode 100644 index b4b1c22..51c73b5 *** a/src/backend/nodes/equalfuncs.c --- b/src/backend/nodes/equalfuncs.c *************** _equalRangeTblEntry(const RangeTblEntry *** 2305,2310 **** --- 2305,2311 ---- COMPARE_SCALAR_FIELD(checkAsUser); COMPARE_BITMAPSET_FIELD(selectedCols); COMPARE_BITMAPSET_FIELD(modifiedCols); + COMPARE_NODE_FIELD(securityQuals); return true; } diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c new file mode 100644 index b130902..4d461b2 *** a/src/backend/nodes/nodeFuncs.c --- b/src/backend/nodes/nodeFuncs.c *************** range_table_walker(List *rtable, *** 1942,1947 **** --- 1942,1950 ---- return true; break; } + + if (walker(rte->securityQuals, context)) + return true; } return false; } *************** range_table_mutator(List *rtable, *** 2653,2658 **** --- 2656,2662 ---- MUTATE(newrte->values_lists, rte->values_lists, List *); break; } + MUTATE(newrte->securityQuals, rte->securityQuals, List *); newrt = lappend(newrt, newrte); } return newrt; diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c new file mode 100644 index 01f381e..eed6869 *** a/src/backend/nodes/outfuncs.c --- b/src/backend/nodes/outfuncs.c *************** _outRangeTblEntry(StringInfo str, const *** 2382,2387 **** --- 2382,2388 ---- WRITE_OID_FIELD(checkAsUser); WRITE_BITMAPSET_FIELD(selectedCols); WRITE_BITMAPSET_FIELD(modifiedCols); + WRITE_NODE_FIELD(securityQuals); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c new file mode 100644 index 1eb7582..bb4009c *** a/src/backend/nodes/readfuncs.c --- b/src/backend/nodes/readfuncs.c *************** _readRangeTblEntry(void) *** 1229,1234 **** --- 1229,1235 ---- READ_OID_FIELD(checkAsUser); READ_BITMAPSET_FIELD(selectedCols); READ_BITMAPSET_FIELD(modifiedCols); + READ_NODE_FIELD(securityQuals); READ_DONE(); } diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c new file mode 100644 index ccd69fc..3cb0a65 *** a/src/backend/optimizer/plan/setrefs.c --- b/src/backend/optimizer/plan/setrefs.c *************** set_plan_references(PlannerInfo *root, P *** 224,229 **** --- 224,230 ---- newrte->ctecoltypes = NIL; newrte->ctecoltypmods = NIL; newrte->ctecolcollations = NIL; + newrte->securityQuals = NIL; glob->finalrtable = lappend(glob->finalrtable, newrte); diff --git a/src/backend/rewrite/rewriteHandler.c b/src/backend/rewrite/rewriteHandler.c new file mode 100644 index 8f75948..2f9a118 *** a/src/backend/rewrite/rewriteHandler.c --- b/src/backend/rewrite/rewriteHandler.c *************** *** 14,19 **** --- 14,20 ---- #include "postgres.h" #include "access/sysattr.h" + #include "catalog/heap.h" #include "catalog/pg_type.h" #include "commands/trigger.h" #include "nodes/makefuncs.h" *************** static List *matchLocks(CmdType event, R *** 60,65 **** --- 61,67 ---- int varno, Query *parsetree); static Query *fireRIRrules(Query *parsetree, List *activeRIRs, bool forUpdatePushedDown); + static Query *rewriteSecurityQuals(Query *parsetree); /* *************** fireRules(Query *parsetree, *** 1829,1834 **** --- 1831,2650 ---- /* + * get_view_query - get the query from a view's _RETURN rule. + */ + static Query * + get_view_query(Relation view) + { + int i; + + Assert(view->rd_rel->relkind == RELKIND_VIEW); + + for (i = 0; i < view->rd_rules->numLocks; i++) + { + RewriteRule *rule = view->rd_rules->rules[i]; + + if (rule->event == CMD_SELECT) + { + /* A _RETURN rule should have only one action */ + if (list_length(rule->actions) != 1) + elog(ERROR, "invalid _RETURN rule action specification"); + + return linitial(rule->actions); + } + } + + elog(ERROR, "failed to find _RETURN rule for view"); + return NULL; /* keep compiler quiet */ + } + + + /* + * test_auto_update_view - + * Test if the specified view can be automatically updated. This will + * either return NULL (if the view can be updated) or the reason that it + * cannot. + * + * Note that the checks performed here are local to this view. It does not + * check that the view's underlying base relation is updatable. + */ + static const char * + test_auto_update_view(Relation view) + { + Query *viewquery; + RangeTblRef *rtr; + RangeTblEntry *base_rte; + Bitmapset *bms; + ListCell *cell; + + /* + * Check if the view is simply updatable. According to SQL-92 this means: + * - No DISTINCT clauses. + * - Every TLE is a column reference, and appears at most once. + * - Refers to a single base relation. + * - No GROUP BY or HAVING clauses. + * - No set operations (UNION, INTERSECT or EXCEPT). + * - No sub-queries in the WHERE clause. + * + * We relax this last restriction since it would actually be more work to + * enforce it than to simply allow it. In addition (for now) we impose + * the following additional constraints, based on features that are not + * part of SQL-92: + * - No DISTINCT ON clauses. + * - No window functions. + * - No CTEs (WITH or WITH RECURSIVE). + * - No OFFSET or LIMIT clauses. + * - No system columns. + * + * Note that we do these checks without recursively expanding the view. + * The base relation may be a view, and it might have INSTEAD OF triggers + * or rules, in which case it need not satisfy these constraints. + */ + viewquery = get_view_query(view); + + if (viewquery->distinctClause != NIL || + viewquery->hasDistinctOn) + return "Views containing DISTINCT are not updatable"; + + if (viewquery->groupClause != NIL) + return "Views containing GROUP BY are not updatable"; + + if (viewquery->havingQual != NULL) + return "Views containing HAVING are not updatable"; + + if (viewquery->hasAggs) + return "Views containing aggregates are not updatable"; + + if (viewquery->hasWindowFuncs) + return "Views containing window functions are not updatable"; + + if (viewquery->setOperations != NULL) + return "Views containing UNION, INTERSECT or EXCEPT are not updatable"; + + if (viewquery->hasRecursive || + viewquery->cteList != NIL) + return "Views containing WITH [RECURSIVE] are not updatable"; + + if (viewquery->limitOffset != NULL || + viewquery->limitCount != NULL) + return "Views containing OFFSET or LIMIT are not updatable"; + + /* + * The view query should select from a single base relation, which must be + * a table or another view. + */ + if (list_length(viewquery->jointree->fromlist) == 0) + return "Views with no base relations are not updatable"; + + if (list_length(viewquery->jointree->fromlist) > 1) + return "Views with multiple base relations are not updatable"; + + rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist); + if (!IsA(rtr, RangeTblRef)) + return "Views that are not based on tables or views are not updatable"; + + base_rte = rt_fetch(rtr->rtindex, viewquery->rtable); + if (base_rte->rtekind != RTE_RELATION) + return "Views that are not based on tables or views are not updatable"; + + /* + * The view's targetlist entries should all be Vars referring to columns + * in the base relation, and no 2 should refer to the same base column. + * + * Note that we don't bother freeing resources allocated here if we + * return early, since that is an error condition. + */ + bms = NULL; + foreach(cell, viewquery->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(cell); + Var *var = (Var *) tle->expr; + + if (!IsA(var, Var)) + return "Views with columns that are not simple references to columns in the base relation are not updatable"; + + if (var->varattno < 0) + return "Views that refer to system columns are not updatable"; + + if (var->varattno == 0) + return "Views that refer to whole rows from the base relation are not updatable"; + + if (bms_is_member(var->varattno, bms)) + return "Views that refer to the same column more than once are not updatable"; + + bms = bms_add_member(bms, var->varattno); + } + + bms_free(bms); + return NULL; /* the view is simply updatable */ + } + + + /* + * rewriteTargetView - + * attempt to rewrite a query where the target relation is a view, so that + * the view's base relation becomes the target relation. + * + * This only handles the case where there are no INSTEAD OF triggers to update + * the view. If there are INSTEAD OF triggers, this function will return NULL + * and the view modification is handled later by fireRIRrules. + * + * Note that the base relation here may itself be a view, which may or may not + * have INSTEAD OF triggers or rules to handle the update. That is handled by + * the recursion in RewriteQuery. + */ + static Query * + rewriteTargetView(Query *parsetree, Relation view) + { + TriggerDesc *trigDesc = view->trigdesc; + const char *auto_update_detail; + Query *viewquery; + RangeTblRef *rtr; + int base_rt_index; + RangeTblEntry *base_rte; + List *view_targetlist; + bool same_cols; + ListCell *cell; + int rt_index; + List *new_rtable; + RangeTblEntry *view_rte; + + /* The view must have INSTEAD OF triggers or be simply updatable */ + switch (parsetree->commandType) + { + case CMD_INSERT: + if (trigDesc && trigDesc->trig_insert_instead_row) + return NULL; /* INSTEAD OF INSERT trigger will be used */ + + auto_update_detail = test_auto_update_view(view); + if (auto_update_detail) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot insert into view \"%s\"", + RelationGetRelationName(view)), + errdetail("%s.", auto_update_detail), + errhint("You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger."))); + break; + case CMD_UPDATE: + if (trigDesc && trigDesc->trig_update_instead_row) + return NULL; /* INSTEAD OF UPDATE trigger will be used */ + + auto_update_detail = test_auto_update_view(view); + if (auto_update_detail) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot update view \"%s\"", + RelationGetRelationName(view)), + errdetail("%s.", auto_update_detail), + errhint("You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger."))); + break; + case CMD_DELETE: + if (trigDesc && trigDesc->trig_delete_instead_row) + return NULL; /* INSTEAD OF DELETE trigger will be used */ + + auto_update_detail = test_auto_update_view(view); + if (auto_update_detail) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot delete from view \"%s\"", + RelationGetRelationName(view)), + errdetail("%s.", auto_update_detail), + errhint("You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger."))); + break; + default: + elog(ERROR, "unrecognized CmdType: %d", (int) parsetree->commandType); + break; + } + + /* + * The view is simply updatable, which means that it should have a single + * base relation. + */ + viewquery = get_view_query(view); + Assert(list_length(viewquery->jointree->fromlist) == 1); + + rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist); + Assert(IsA(rtr, RangeTblRef)); + + base_rt_index = rtr->rtindex; + base_rte = rt_fetch(base_rt_index, viewquery->rtable); + Assert(base_rte->rtekind == RTE_RELATION); + + /* + * Make a copy of the view's targetlist, adjusting its Vars to allow it + * to be plugged into the main query. + */ + view_targetlist = copyObject(viewquery->targetList); + + ChangeVarNodes((Node *) view_targetlist, + base_rt_index, parsetree->resultRelation, 0); + + /** + * Test for the simple case where the view has all the same columns as the + * base relation, in the same order. + */ + same_cols = true; + foreach(cell, view_targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(cell); + Var *var = (Var *) tle->expr; + + if (var->varattno != tle->resno) + { + /* view columns are in a different order */ + same_cols = false; + break; + } + } + + /* + * Replace the old view target RTE with a new RTE referring to the base + * relation. Since we are not changing the result relation index, there + * is no need to update any varnos in the original query. + * + * Note that we need to keep the original view RTE, even though it will + * not be used directly in the query, so that the correct permissions + * checks are still done against it. This is appended to the end of the + * rangetable. + */ + rt_index = 1; + new_rtable = NIL; + foreach(cell, parsetree->rtable) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell); + + if (rt_index == parsetree->resultRelation) + { + /* + * Create the new target RTE using the base relation. Copy any + * permissions checks from the original view target RTE, adjusting + * the column numbers if necessary, and performing the checks as + * the view's owner. + */ + RangeTblEntry *newrte = (RangeTblEntry *) copyObject(base_rte); + newrte->requiredPerms = rte->requiredPerms; + newrte->checkAsUser = view->rd_rel->relowner; + newrte->selectedCols = bms_copy(rte->selectedCols); + newrte->modifiedCols = bms_copy(rte->modifiedCols); + + if (!same_cols) + { + newrte->selectedCols = adjust_column_set(newrte->selectedCols, + view_targetlist); + newrte->modifiedCols = adjust_column_set(newrte->modifiedCols, + view_targetlist); + } + + /* + * Move any security barrier quals from the view RTE onto the new + * base relation RTE, since the original view RTE will not be + * referenced in the final query. + */ + newrte->securityQuals = rte->securityQuals; + rte->securityQuals = NIL; + + /* + * Keep the original view RTE so that executor also checks that + * the current user has the required permissions on the view + */ + view_rte = rte; + rte = newrte; + } + new_rtable = lappend(new_rtable, rte); + rt_index++; + } + parsetree->rtable = lappend(new_rtable, view_rte); + + /* + * For UPDATE/DELETE, rewriteTargetListUD will have added a wholerow junk + * TLE for the view to the end of the targetlist which we no longer need. + * We remove this now to avoid unnecessary work when we process the + * targetlist. Note that when we recurse through rewriteQuery a new junk + * TLE will be added to allow the executor to find the row in the base + * relation. + */ + if (parsetree->commandType != CMD_INSERT) + { + TargetEntry *tle = (TargetEntry *) + lfirst(list_tail(parsetree->targetList)); + + Assert(IsA(tle->expr, Var) && ((Var *) tle->expr)->varattno == 0); + parsetree->targetList = list_delete_ptr(parsetree->targetList, tle); + } + + /* + * If the view's columns don't match the base relation's columns, we need + * to update the targetlist to point to the columns in the base relation, + * and similarly update any Vars in the query that refer to the result + * relation. + * + * Note that this destroys the resno ordering of the targetlist, but that + * will be restored when we recurse through rewriteQuery which will invoke + * rewriteTargetListIU on this updated targetlist. + */ + if (!same_cols) + { + /* Update the main query's targetlist */ + foreach(cell, parsetree->targetList) + { + TargetEntry *tle = (TargetEntry *) lfirst(cell); + TargetEntry *view_tle; + + /* Ignore system columns */ + if (tle->resjunk || tle->resno <= 0) + continue; + + view_tle = get_tle_by_resno(view_targetlist, tle->resno); + if (view_tle == NULL) + elog(ERROR, "View column %d not found", tle->resno); + + tle->resno = ((Var *) view_tle->expr)->varattno; + } + + /* Update any Vars referring to the result relation. */ + parsetree = + (Query *) ResolveNew((Node *) parsetree, + parsetree->resultRelation, + 0, + view_rte, + view_targetlist, + parsetree->commandType, + parsetree->resultRelation, + &parsetree->hasSubLinks); + } + + /* + * For UPDATE/DELETE, deal with any quals from the view's jointree. We + * know that there is just one relation in the view, so any references to + * that relation in the view's quals need to be updated to refer to the + * corresponding relation in the main query (the result relation). + * + * If the view is not a security barrier then we simply add the view quals + * to the main query. However, if the view is a security barrier then its + * quals must take precedence over any of the user's quals. Here we just + * make a note of such quals on the relevant RTE so that we can deal with + * them later in rewriteSecurityQuals, which will turn the RTE into a + * subquery. + * + * For INSERT the view's quals are not needed for now. When we implement + * WITH CHECK OPTION, this might be a good time to collect them. + */ + if (parsetree->commandType != CMD_INSERT && + viewquery->jointree->quals != NULL) + { + Node *viewqual = copyObject(viewquery->jointree->quals); + + ChangeVarNodes(viewqual, base_rt_index, parsetree->resultRelation, 0); + + if (RelationIsSecurityView(view)) + { + base_rte = rt_fetch(parsetree->resultRelation, parsetree->rtable); + base_rte->securityQuals = lcons(viewqual, + base_rte->securityQuals); + } + else + AddQual(parsetree, (Node *) viewqual); + } + + return parsetree; + } + + + /* + * rewriteSecurityQual - + * rewrite the specified security barrier qual on a query RTE, turning the + * RTE into a subquery. + */ + static Query * + rewriteSecurityQual(Query *parsetree, int rt_index, + bool is_result_relation, Node *qual) + { + RangeTblEntry *rte; + List *targetlist = NIL; + List *inverse_targetlist = NIL; + List *colnames = NIL; + Relation relation; + AttrNumber attno; + Var *var; + TargetEntry *tle; + Query *subquery; + RangeTblEntry *subrte; + RangeTblRef *subrtr; + ListCell *cell; + + rte = rt_fetch(rt_index, parsetree->rtable); + + /* + * There are 2 possible cases: + * + * 1). A relation RTE, which we turn into a subquery RTE containing all + * referenced columns. + * + * 2). A subquery RTE (either from a prior call to this function or from + * an expanded view). In this case we build a new subquery on top of it + * to isolate this security barrier qual from any other quals. + */ + switch (rte->rtekind) + { + case RTE_RELATION: + /* + * Build the subquery targetlist from the columns used from the + * underlying table and an inverse targetlist to map varattnos in + * the main query onto the new subquery columns. + */ + relation = heap_open(rte->relid, NoLock); + + for (attno = FirstLowInvalidHeapAttributeNumber; + attno <= relation->rd_att->natts; attno++) + { + Form_pg_attribute att_tup; + char *attname; + + /* Ignore columns that aren't used */ + if (!attribute_used((Node *) parsetree, rt_index, attno, 0)) + continue; + + if (attno == InvalidAttrNumber) + { + /* whole-row attribute */ + var = makeWholeRowVar(rte, 1, 0, false); + attname = "wholerow"; + } + else + { + /* regular table column or system attribute */ + if (attno >= 1) + att_tup = relation->rd_att->attrs[attno - 1]; + else + att_tup = SystemAttributeDefinition(attno, + relation->rd_rel->relhasoids); + + var = makeVar(1, + attno, + att_tup->atttypid, + att_tup->atttypmod, + att_tup->attcollation, + 0); + attname = NameStr(att_tup->attname); + } + + /* target entry for new subquery targetlist */ + tle = makeTargetEntry((Expr *) var, + list_length(targetlist) + 1, + pstrdup(attname), + false); + targetlist = lappend(targetlist, tle); + + /* inverse target entry for rewriting the main query */ + var = copyObject(var); + var->varattno = list_length(targetlist); + tle = makeTargetEntry((Expr *) var, + attno, + pstrdup(attname), + false); + inverse_targetlist = lappend(inverse_targetlist, tle); + + colnames = lappend(colnames, makeString(pstrdup(attname))); + } + heap_close(relation, NoLock); + + /* + * Turn the main relation RTE into a security barrier subquery + * RTE, moving all permissions checks and rowMarks down into the + * subquery. + */ + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_INSTEAD_RULE; + subquery->targetList = targetlist; + + subrte = copyObject(rte); + subrte->inFromCl = true; + subrte->securityQuals = NIL; + subquery->rtable = list_make1(subrte); + + subrtr = makeNode(RangeTblRef); + subrtr->rtindex = 1; + subquery->jointree = makeFromExpr(list_make1(subrtr), qual); + subquery->hasSubLinks = checkExprHasSubLink(qual); + + foreach(cell, parsetree->rowMarks) + { + RowMarkClause *rc = (RowMarkClause *) lfirst(cell); + if (rc->rti == rt_index) + { + parsetree->rowMarks = list_delete(parsetree->rowMarks, + rc); + rc->rti = 1; + subquery->rowMarks = list_make1(rc); + subquery->hasForUpdate = rc->forUpdate; + break; + } + } + + /* + * If this RTE was the result relation, then we need to lock the + * rows coming from it. Note that by the time we get here, this + * RTE will no longer be the result relation, so we have to rely + * on the flag passed in. + */ + if (is_result_relation) + applyLockingClause(subquery, 1, true, false, true); + + rte->rtekind = RTE_SUBQUERY; + rte->relid = InvalidOid; + rte->subquery = subquery; + rte->security_barrier = true; + rte->eref = makeAlias(rte->eref->aliasname, colnames); + rte->inh = false; /* must not be set for a subquery */ + + /* the permissions checks have now been move down */ + rte->requiredPerms = 0; + rte->checkAsUser = InvalidOid; + rte->selectedCols = NULL; + rte->modifiedCols = NULL; + + /* + * Update any varattnos in the main query that refer to this RTE, + * using the entries from the inverse targetlist. + */ + return (Query *) ResolveNew((Node *) parsetree, + rt_index, + 0, + rte, + inverse_targetlist, + parsetree->commandType, + rt_index, + &parsetree->hasSubLinks); + + case RTE_SUBQUERY: + /* + * Build a new subquery that includes all the same columns as the + * original subquery. + */ + foreach(cell, rte->subquery->targetList) + { + tle = (TargetEntry *) lfirst(cell); + var = makeVarFromTargetEntry(1, tle); + + tle = makeTargetEntry((Expr *) var, + list_length(targetlist) + 1, + pstrdup(tle->resname), + tle->resjunk); + targetlist = lappend(targetlist, tle); + } + + subquery = makeNode(Query); + subquery->commandType = CMD_SELECT; + subquery->querySource = QSRC_INSTEAD_RULE; + subquery->targetList = targetlist; + + subrte = makeNode(RangeTblEntry); + subrte->rtekind = RTE_SUBQUERY; + subrte->subquery = rte->subquery; + subrte->security_barrier = rte->security_barrier; + subrte->eref = copyObject(rte->eref); + subrte->inFromCl = true; + subquery->rtable = list_make1(subrte); + + subrtr = makeNode(RangeTblRef); + subrtr->rtindex = 1; + subquery->jointree = makeFromExpr(list_make1(subrtr), qual); + subquery->hasSubLinks = checkExprHasSubLink(qual); + + rte->subquery = subquery; + rte->security_barrier = true; + + return parsetree; + + default: + elog(ERROR, "invalid range table entry for security barrier qual"); + } + + return NULL; + } + + + /* + * rewriteSecurityQualsOnSubLink - + * Apply rewriteSecurityQuals() to each SubLink (subselect in expression) + * found in the given tree. + * + * NOTE: although this has the form of a walker, we cheat and modify the + * SubLink nodes in-place. This is safe because we are not descending into + * subqueries, so no parts of the tree we are modifying are being traversed. + * + * Each SubLink subselect is replaced with a possibly-rewritten subquery. + */ + static bool + rewriteSecurityQualsOnSubLink(Node *node, void *ctx) + { + if (node == NULL) + return false; + if (IsA(node, SubLink)) + { + SubLink *sub = (SubLink *) node; + + sub->subselect = (Node *) + rewriteSecurityQuals((Query *) sub->subselect); + /* Fall through to process lefthand args of SubLink */ + } + + /* + * Do NOT recurse into Query nodes, because rewriteSecurityQuals already + * processed all subqueries in the rtable and cteList. + */ + return expression_tree_walker(node, rewriteSecurityQualsOnSubLink, ctx); + } + + + /* + * rewriteSecurityQuals - + * rewrites any security barrier quals on RTEs in the query, turning them + * into subqueries to allow the planner to enforce them before any user + * quals where necessary. Currently such security barrier quals can only + * have come from automatically updatable security barrier views. + * + * We do this at the end of the rewriting process (after any SELECT rules + * have been applied) so that the new security barrier subqueries wrap any + * remaining views after they are expanded. + * + * Any given RTE may have multiple security barrier quals in a list, from + * which we create a set of nested subqueries to isolate each security barrier + * from the others, providing protection against malicious user security + * barriers. The first item in the list represents the innermost subquery. + */ + static Query * + rewriteSecurityQuals(Query *parsetree) + { + ListCell *l; + int rt_index; + + /* + * Process each RTE in the rtable list. Security barrier quals are + * initially only added to the result relation, but subsequent rules + * may change that, so they may be anywhere. + * + * Note that this is deliberately not a foreach loop, since the whole + * parsetree may be mutated each time through the loop. + */ + rt_index = 0; + while (rt_index < list_length(parsetree->rtable)) + { + RangeTblEntry *rte; + bool is_result_relation; + int qual_idx; + + ++rt_index; + rte = rt_fetch(rt_index, parsetree->rtable); + + if (rte->securityQuals == NIL) + continue; + + /* + * Ignore any RTEs that aren't used in the query (such RTEs may be + * present for permissions checks). + */ + if (rt_index != parsetree->resultRelation && + !rangeTableEntry_used((Node *) parsetree, rt_index, 0)) + continue; + + /* + * Recursively process any security barrier quals in subquery RTEs + * before processing any at this query level. + */ + if (rte->rtekind == RTE_SUBQUERY) + rte->subquery = rewriteSecurityQuals(rte->subquery); + + /* + * If this RTE is the target then we need to make a copy of it before + * expanding it. The unexpanded copy will become the new target, and + * the expanded RTE will be the source of rows to update/delete. + */ + is_result_relation = rt_index == parsetree->resultRelation; + if (is_result_relation) + { + RangeTblEntry *newrte = copyObject(rte); + parsetree->rtable = lappend(parsetree->rtable, newrte); + parsetree->resultRelation = list_length(parsetree->rtable); + + /* + * Wipe out any copied security quals on the new target to prevent + * infinite recursion. + */ + newrte->securityQuals = NIL; + + /* + * There's no need to do permissions checks twice, so wipe out the + * permissions info for the original RTE (we prefer to keep the + * bits set on the result RTE). + */ + rte->requiredPerms = 0; + rte->checkAsUser = InvalidOid; + rte->selectedCols = NULL; + rte->modifiedCols = NULL; + + /* + * For the most part, Vars referencing the original relation + * should remain as as they are, meaning that they implicitly + * represent OLD values. But in the RETURNING list if any, we + * want such Vars to represent NEW values, so change them to + * reference the new RTE. + * + * Since ChangeVarNodes scribbles on the tree in-place, copy the + * RETURNING list first for safety. + */ + parsetree->returningList = copyObject(parsetree->returningList); + ChangeVarNodes((Node *) parsetree->returningList, rt_index, + parsetree->resultRelation, 0); + } + + /* + * Process each security qual in turn, starting with the innermost + * one and working outwards. + * + * Note that we can't use a foreach loop here because the whole + * parsetree may be mutated each time through the loop. For the same + * reason we must re-fetch the RTE each time. + */ + qual_idx = 0; + while (qual_idx < list_length(rte->securityQuals)) + { + Node *qual = (Node *) list_nth(rte->securityQuals, qual_idx); + + parsetree = rewriteSecurityQual(parsetree, rt_index, + is_result_relation, qual); + + /* re-fetch the RTE in case it has been re-written */ + rte = rt_fetch(rt_index, parsetree->rtable); + + qual_idx++; + } + rte->securityQuals = NIL; + } + + /* Recurse into subqueries in WITH */ + foreach(l, parsetree->cteList) + { + CommonTableExpr *cte = (CommonTableExpr *) lfirst(l); + + cte->ctequery = (Node *) + rewriteSecurityQuals((Query *) cte->ctequery); + } + + /* + * Recurse into sublink subqueries too, without descending into the rtable + * or the cteList, which we have already processed. + */ + if (parsetree->hasSubLinks) + query_tree_walker(parsetree, rewriteSecurityQualsOnSubLink, NULL, + QTW_IGNORE_RC_SUBQUERIES); + + return parsetree; + } + + + /* * RewriteQuery - * rewrites the query and apply the rules again on the queries rewritten * *************** RewriteQuery(Query *parsetree, List *rew *** 1842,1847 **** --- 2658,2664 ---- bool instead = false; bool returning = false; Query *qual_product = NULL; + List *product_queries = NIL; List *rewritten = NIL; ListCell *lc1; *************** RewriteQuery(Query *parsetree, List *rew *** 1993,2001 **** result_relation, parsetree); if (locks != NIL) - { - List *product_queries; - product_queries = fireRules(parsetree, result_relation, event, --- 2810,2815 ---- *************** RewriteQuery(Query *parsetree, List *rew *** 2004,2009 **** --- 2818,2848 ---- &returning, &qual_product); + /* + * If there are no unqualified INSTEAD rules, and the target relation + * is a view without any INSTEAD OF triggers, then we have a problem + * unless it can be automatically updated. + * + * If we can automatically update the view, then we do so here and add + * the resulting query to the product queries list, so that it gets + * recursively rewritten if necessary. We mark this as an unqualified + * INSTEAD to prevent the original query from being executed. Note + * also that if this succeeds it will have rewritten any returning + * list too. + */ + if (!instead && rt_entry_relation->rd_rel->relkind == RELKIND_VIEW) + { + Query *query = qual_product ? qual_product : parsetree; + Query *newquery = rewriteTargetView(query, rt_entry_relation); + + if (newquery != NULL) + { + product_queries = lappend(product_queries, newquery); + instead = true; + returning = true; + } + } + /* * If we got any product queries, recursively rewrite them --- but * first check for recursion! *************** RewriteQuery(Query *parsetree, List *rew *** 2040,2046 **** rewrite_events = list_delete_first(rewrite_events); } - } /* * If there is an INSTEAD, and the original query has a RETURNING, we --- 2879,2884 ---- *************** QueryRewrite(Query *parsetree) *** 2181,2187 **** /* * Step 2 * ! * Apply all the RIR rules on each query * * This is also a handy place to mark each query with the original queryId */ --- 3019,3026 ---- /* * Step 2 * ! * Apply all the RIR rules on each query, and then expand any security ! * quals that apply to RTEs in the query. * * This is also a handy place to mark each query with the original queryId */ *************** QueryRewrite(Query *parsetree) *** 2191,2196 **** --- 3030,3036 ---- Query *query = (Query *) lfirst(l); query = fireRIRrules(query, NIL, false); + query = rewriteSecurityQuals(query); query->queryId = input_query_id; *************** QueryRewrite(Query *parsetree) *** 2245,2247 **** --- 3085,3212 ---- return results; } + + + /* + * is_relation_updatable - test if the specified relation is updatable. + * + * This is used for the information schema views, which have separate concepts + * of "updatable" and "trigger updatable" although their precise definitions + * are not entirely clear in the SQL standard. We take "updatable" to mean + * that the relation can be updated without the need for triggers (either + * because it has a suitable update rule, or because it is simple enough to be + * automatically updated), and "trigger updatable" means it has a suitable + * INSTEAD OF trigger. + * + * In the case of an automatically updatable view, the base relation must also + * be updatable, which we take to mean the base relation is either updatable + * or trigger updatable. + */ + static bool + is_relation_updatable(Oid oid, CmdType event, bool includeTriggers) + { + Relation rel; + RuleLock *rulelocks; + + rel = heap_open(oid, AccessShareLock); + + /* Look for an unconditional DO INSTEAD rule */ + rulelocks = rel->rd_rules; + if (rulelocks != NULL) + { + int i; + + for (i = 0; i < rulelocks->numLocks; i++) + { + if (rulelocks->rules[i]->event == event && + rulelocks->rules[i]->isInstead && + rulelocks->rules[i]->qual == NULL) + { + heap_close(rel, AccessShareLock); + return true; + } + } + } + + /* Maybe also check for an INSTEAD OF trigger */ + if (includeTriggers) + { + TriggerDesc *trigDesc = rel->trigdesc; + bool updatable = false; + + switch (event) + { + case CMD_INSERT: + updatable = trigDesc && trigDesc->trig_insert_instead_row; + break; + + case CMD_UPDATE: + updatable = trigDesc && trigDesc->trig_update_instead_row; + break; + + case CMD_DELETE: + updatable = trigDesc && trigDesc->trig_delete_instead_row; + break; + + default: + elog(ERROR, "unrecognized CmdType: %d", (int) event); + break; + } + + if (updatable) + { + heap_close(rel, AccessShareLock); + return true; + } + } + + /* Check if this is an automatically updatable view */ + if (rel->rd_rel->relkind == RELKIND_VIEW && + test_auto_update_view(rel) == NULL) + { + Query *viewquery; + RangeTblRef *rtr; + RangeTblEntry *base_rte; + Oid baseoid; + + /* The base relation must also be updatable */ + viewquery = get_view_query(rel); + rtr = (RangeTblRef *) linitial(viewquery->jointree->fromlist); + base_rte = rt_fetch(rtr->rtindex, viewquery->rtable); + + if (base_rte->relkind == RELKIND_RELATION) + { + /* Tables are always updatable */ + heap_close(rel, AccessShareLock); + return true; + } + else + { + /* Do a recursive check for any other kind of base relation */ + baseoid = base_rte->relid; + heap_close(rel, AccessShareLock); + return is_relation_updatable(baseoid, event, true); + } + } + + /* If we reach here, the relation is not updatable */ + heap_close(rel, AccessShareLock); + return false; + } + + Datum + pg_is_view_updatable(PG_FUNCTION_ARGS) + { + Oid viewoid = PG_GETARG_OID(0); + + PG_RETURN_BOOL(is_relation_updatable(viewoid, CMD_UPDATE, false) && + is_relation_updatable(viewoid, CMD_DELETE, false)); + } + + Datum + pg_is_view_insertable(PG_FUNCTION_ARGS) + { + Oid viewoid = PG_GETARG_OID(0); + + PG_RETURN_BOOL(is_relation_updatable(viewoid, CMD_INSERT, false)); + } diff --git a/src/backend/rewrite/rewriteManip.c b/src/backend/rewrite/rewriteManip.c new file mode 100644 index ef04c34..d9bd5b4 *** a/src/backend/rewrite/rewriteManip.c --- b/src/backend/rewrite/rewriteManip.c *************** *** 13,18 **** --- 13,19 ---- */ #include "postgres.h" + #include "access/sysattr.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" *************** ResolveNew(Node *node, int target_varno, *** 1443,1445 **** --- 1444,1486 ---- (void *) &context, outer_hasSubLinks); } + + + /* + * adjust_column_set - replace columns in a set with entries from a targetlist + * + * Non-system columns in the set are replaced by the entry with matching resno + * from targetlist, and system columns are left unchanged. This is used for + * simply updatable views to map permissions checks on the view columns onto + * the matching columns in the underlying base relation. + */ + Bitmapset * + adjust_column_set(Bitmapset *cols, List *targetlist) + { + Bitmapset *tmpcols = bms_copy(cols); + Bitmapset *result = NULL; + AttrNumber col; + + while ((col = bms_first_member(tmpcols)) >= 0) + { + AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; + + if (attno <= 0) + /* system column - leave it unchanged */ + result = bms_add_member(result, col); + else if (attno <= list_length(targetlist)) + { + /* non-system column - update it from the targetlist */ + TargetEntry *tle = get_tle_by_resno(targetlist, attno); + if (tle != NULL && IsA(tle->expr, Var)) + { + Var *var = (Var *) tle->expr; + result = bms_add_member(result, + var->varattno - FirstLowInvalidHeapAttributeNumber); + } + } + } + bms_free(tmpcols); + + return result; + } diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h new file mode 100644 index 77a3b41..c0673b7 *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** DATA(insert OID = 2232 ( pg_get_functio *** 1958,1963 **** --- 1958,1967 ---- DESCR("identity argument list of a function"); DATA(insert OID = 2165 ( pg_get_function_result PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 25 "26" _null_ _null_ _null_ _null_ pg_get_function_result _null_ _null_ _null_ )); DESCR("result type of a function"); + DATA(insert OID = 3170 ( pg_is_view_updatable PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ pg_is_view_updatable _null_ _null_ _null_ )); + DESCR("whether or not a view is updatable"); + DATA(insert OID = 3171 ( pg_is_view_insertable PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ pg_is_view_insertable _null_ _null_ _null_ )); + DESCR("whether or not a view is insertable"); DATA(insert OID = 1686 ( pg_get_keywords PGNSP PGUID 12 10 400 0 0 f f f f t t s 0 0 2249 "" "{25,18,25}" "{o,o,o}" "{word,catcode,catdesc}" _null_ pg_get_keywords _null_ _null_ _null_ )); DESCR("list of SQL keywords"); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h new file mode 100644 index 19178b5..08eebd5 *** a/src/include/nodes/parsenodes.h --- b/src/include/nodes/parsenodes.h *************** typedef struct RangeTblEntry *** 765,770 **** --- 765,771 ---- Oid checkAsUser; /* if valid, check access as this role */ Bitmapset *selectedCols; /* columns needing SELECT permission */ Bitmapset *modifiedCols; /* columns needing INSERT/UPDATE permission */ + List *securityQuals; /* any security barrier quals to apply */ } RangeTblEntry; /* diff --git a/src/include/rewrite/rewriteHandler.h b/src/include/rewrite/rewriteHandler.h new file mode 100644 index 50625d4..9879c5c *** a/src/include/rewrite/rewriteHandler.h --- b/src/include/rewrite/rewriteHandler.h *************** *** 20,24 **** --- 20,26 ---- extern List *QueryRewrite(Query *parsetree); extern void AcquireRewriteLocks(Query *parsetree, bool forUpdatePushedDown); extern Node *build_column_default(Relation rel, int attrno); + extern Datum pg_is_view_updatable(PG_FUNCTION_ARGS); + extern Datum pg_is_view_insertable(PG_FUNCTION_ARGS); #endif /* REWRITEHANDLER_H */ diff --git a/src/include/rewrite/rewriteManip.h b/src/include/rewrite/rewriteManip.h new file mode 100644 index e13331d..4e0b867 *** a/src/include/rewrite/rewriteManip.h --- b/src/include/rewrite/rewriteManip.h *************** extern Node *ResolveNew(Node *node, int *** 74,77 **** --- 74,79 ---- List *targetlist, int event, int update_varno, bool *outer_hasSubLinks); + extern Bitmapset *adjust_column_set(Bitmapset *cols, List *targetlist); + #endif /* REWRITEMANIP_H */ diff --git a/src/test/regress/expected/triggers.out b/src/test/regress/expected/triggers.out new file mode 100644 index b5af066..5439120 *** a/src/test/regress/expected/triggers.out --- b/src/test/regress/expected/triggers.out *************** DROP TABLE min_updates_test_oids; *** 820,839 **** -- Test triggers on views -- CREATE VIEW main_view AS SELECT a, b FROM main_table; - -- Updates should fail without rules or triggers - INSERT INTO main_view VALUES (1,2); - ERROR: cannot insert into view "main_view" - HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger. - UPDATE main_view SET b = 20 WHERE a = 50; - ERROR: cannot update view "main_view" - HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger. - DELETE FROM main_view WHERE a = 50; - ERROR: cannot delete from view "main_view" - HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. - -- Should fail even when there are no matching rows - DELETE FROM main_view WHERE a = 51; - ERROR: cannot delete from view "main_view" - HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. -- VIEW trigger function CREATE OR REPLACE FUNCTION view_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ --- 820,825 ---- diff --git a/src/test/regress/expected/updatable_views.out b/src/test/regress/expected/updatable_views.out new file mode 100644 index ...3c0a424 *** a/src/test/regress/expected/updatable_views.out --- b/src/test/regress/expected/updatable_views.out *************** *** 0 **** --- 1,744 ---- + -- + -- UPDATABLE VIEWS + -- + CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified'); + INSERT INTO base_tbl VALUES (1, 'Row 1'); + INSERT INTO base_tbl VALUES (2, 'Row 2'); + -- non-updatable views + CREATE VIEW ro_view1 AS SELECT DISTINCT a, b FROM base_tbl; -- DISTINCT not supported + CREATE VIEW ro_view2 AS SELECT a, b FROM base_tbl GROUP BY a, b; -- GROUP BY not supported + CREATE VIEW ro_view3 AS SELECT 1 FROM base_tbl HAVING max(a) > 0; -- HAVING not supported + CREATE VIEW ro_view4 AS SELECT count(*) FROM base_tbl; -- Aggregate functions not supported + CREATE VIEW ro_view5 AS SELECT a, rank() OVER() FROM base_tbl; -- Window functions not supported + CREATE VIEW ro_view6 AS SELECT a, b FROM base_tbl UNION SELECT -a, b FROM base_tbl; -- Set ops not supported + CREATE VIEW ro_view7 AS WITH t AS (SELECT a, b FROM base_tbl) SELECT * FROM t; -- WITH [RECURSIVE] not supported + CREATE VIEW ro_view8 AS SELECT a, b FROM base_tbl ORDER BY a OFFSET 1; -- OFFSET not supported + CREATE VIEW ro_view9 AS SELECT a, b FROM base_tbl ORDER BY a LIMIT 1; -- LIMIT not supported + CREATE VIEW ro_view10 AS SELECT 1 AS a; -- No base relations + CREATE VIEW ro_view11 AS SELECT b1.a, b2.b FROM base_tbl b1, base_tbl b2; -- Multiple base relations + CREATE VIEW ro_view12 AS SELECT * FROM generate_series(1, 10) AS g(a); -- SRF in rangetable + CREATE VIEW ro_view13 AS SELECT a, b FROM (SELECT * FROM base_tbl) AS t; -- Subselect in rangetable + CREATE VIEW ro_view14 AS SELECT ctid FROM base_tbl; -- System columns not supported + CREATE VIEW ro_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function in targetlist + CREATE VIEW ro_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column + CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'ro_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + ro_view1 | NO | NO + ro_view10 | NO | NO + ro_view11 | NO | NO + ro_view12 | NO | NO + ro_view13 | NO | NO + ro_view14 | NO | NO + ro_view15 | NO | NO + ro_view16 | NO | NO + ro_view17 | NO | NO + ro_view2 | NO | NO + ro_view3 | NO | NO + ro_view4 | NO | NO + ro_view5 | NO | NO + ro_view6 | NO | NO + ro_view7 | NO | NO + ro_view8 | NO | NO + ro_view9 | NO | NO + (17 rows) + + DELETE FROM ro_view1; + ERROR: cannot delete from view "ro_view1" + DETAIL: Views containing DISTINCT are not updatable. + HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. + DELETE FROM ro_view2; + ERROR: cannot delete from view "ro_view2" + DETAIL: Views containing GROUP BY are not updatable. + HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. + DELETE FROM ro_view3; + ERROR: cannot delete from view "ro_view3" + DETAIL: Views containing HAVING are not updatable. + HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. + DELETE FROM ro_view4; + ERROR: cannot delete from view "ro_view4" + DETAIL: Views containing aggregates are not updatable. + HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. + DELETE FROM ro_view5; + ERROR: cannot delete from view "ro_view5" + DETAIL: Views containing window functions are not updatable. + HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. + DELETE FROM ro_view6; + ERROR: cannot delete from view "ro_view6" + DETAIL: Views containing UNION, INTERSECT or EXCEPT are not updatable. + HINT: You need an unconditional ON DELETE DO INSTEAD rule or an INSTEAD OF DELETE trigger. + UPDATE ro_view7 SET a=a+1; + ERROR: cannot update view "ro_view7" + DETAIL: Views containing WITH [RECURSIVE] are not updatable. + HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger. + UPDATE ro_view8 SET a=a+1; + ERROR: cannot update view "ro_view8" + DETAIL: Views containing OFFSET or LIMIT are not updatable. + HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger. + UPDATE ro_view9 SET a=a+1; + ERROR: cannot update view "ro_view9" + DETAIL: Views containing OFFSET or LIMIT are not updatable. + HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger. + UPDATE ro_view10 SET a=a+1; + ERROR: cannot update view "ro_view10" + DETAIL: Views with no base relations are not updatable. + HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger. + UPDATE ro_view11 SET a=a+1; + ERROR: cannot update view "ro_view11" + DETAIL: Views with multiple base relations are not updatable. + HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger. + UPDATE ro_view12 SET a=a+1; + ERROR: cannot update view "ro_view12" + DETAIL: Views that are not based on tables or views are not updatable. + HINT: You need an unconditional ON UPDATE DO INSTEAD rule or an INSTEAD OF UPDATE trigger. + INSERT INTO ro_view13 VALUES (3, 'Row 3'); + ERROR: cannot insert into view "ro_view13" + DETAIL: Views that are not based on tables or views are not updatable. + HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger. + INSERT INTO ro_view14 VALUES (null); + ERROR: cannot insert into view "ro_view14" + DETAIL: Views that refer to system columns are not updatable. + HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger. + INSERT INTO ro_view15 VALUES (3, 'ROW 3'); + ERROR: cannot insert into view "ro_view15" + DETAIL: Views with columns that are not simple references to columns in the base relation are not updatable. + HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger. + INSERT INTO ro_view16 VALUES (3, 'Row 3', 3); + ERROR: cannot insert into view "ro_view16" + DETAIL: Views that refer to the same column more than once are not updatable. + HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger. + INSERT INTO ro_view17 VALUES (3, 'ROW 3'); + ERROR: cannot insert into view "ro_view1" + DETAIL: Views containing DISTINCT are not updatable. + HINT: You need an unconditional ON INSERT DO INSTEAD rule or an INSTEAD OF INSERT trigger. + DROP VIEW ro_view1, ro_view2, ro_view3, ro_view4, ro_view5, + ro_view6, ro_view7, ro_view8, ro_view9, ro_view10, + ro_view11, ro_view12, ro_view13, ro_view14, ro_view15, + ro_view16, ro_view17; + -- simple updatable view + CREATE VIEW rw_view1 AS SELECT * FROM base_tbl; + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view1'; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view1 | YES | YES + (1 row) + + INSERT INTO rw_view1 VALUES (3, 'Row 3'); + INSERT INTO rw_view1 (a) VALUES (4); + UPDATE rw_view1 SET a=0 WHERE a=1; + DELETE FROM rw_view1 WHERE b='Row 2'; + SELECT * FROM base_tbl; + a | b + ---+------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + (3 rows) + + EXPLAIN (costs off) UPDATE rw_view1 SET a=1 WHERE a=0; + QUERY PLAN + -------------------------------------------------- + Update on base_tbl + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (a = 0) + (3 rows) + + EXPLAIN (costs off) DELETE FROM rw_view1 WHERE a=0; + QUERY PLAN + -------------------------------------------------- + Delete on base_tbl + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (a = 0) + (3 rows) + + -- view on top of view hiding private data + INSERT INTO base_tbl VALUES (-1, 'Private data'); + CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE b !~* 'private'; + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view2'; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view2 | YES | YES + (1 row) + + SELECT * FROM base_tbl; + a | b + ----+-------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + -1 | Private data + (4 rows) + + SELECT * FROM rw_view2; + a | b + ---+------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + (3 rows) + + INSERT INTO rw_view2 VALUES (5, 'Row 5'); + INSERT INTO rw_view2 (a) VALUES (6); + SELECT * FROM rw_view2; + a | b + ---+------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 5 | Row 5 + 6 | Unspecified + (5 rows) + + UPDATE rw_view2 SET b='Row 6' WHERE a=6; + DELETE FROM rw_view2 WHERE a=5; + SELECT * FROM rw_view2; + a | b + ---+------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + (4 rows) + + EXPLAIN (costs off) UPDATE rw_view2 SET a=1 WHERE a=0; + QUERY PLAN + -------------------------------------------------- + Update on base_tbl + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (a = 0) + Filter: (b !~* 'private'::text) + (4 rows) + + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE a=0; + QUERY PLAN + -------------------------------------------------- + Delete on base_tbl + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (a = 0) + Filter: (b !~* 'private'::text) + (4 rows) + + -- snoop on private value + CREATE FUNCTION snoop(a int, b text) + RETURNS boolean AS + $$ + BEGIN + RAISE NOTICE 'a=%, b=%', a, b; + RETURN true; + END; + $$ + LANGUAGE plpgsql COST 0.000001; + SELECT * FROM rw_view2 WHERE snoop(a,b); + NOTICE: a=3, b=Row 3 + NOTICE: a=4, b=Unspecified + NOTICE: a=0, b=Row 1 + NOTICE: a=-1, b=Private data + NOTICE: a=6, b=Row 6 + a | b + ---+------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + (4 rows) + + UPDATE rw_view2 SET a=a WHERE snoop(a,b); + NOTICE: a=3, b=Row 3 + NOTICE: a=4, b=Unspecified + NOTICE: a=0, b=Row 1 + NOTICE: a=-1, b=Private data + NOTICE: a=6, b=Row 6 + DELETE FROM rw_view2 WHERE NOT snoop(a,b); + NOTICE: a=-1, b=Private data + NOTICE: a=3, b=Row 3 + NOTICE: a=4, b=Unspecified + NOTICE: a=0, b=Row 1 + NOTICE: a=6, b=Row 6 + EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b); + QUERY PLAN + ----------------------------------------------------- + Seq Scan on base_tbl + Filter: (snoop(a, b) AND (b !~* 'private'::text)) + (2 rows) + + EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b); + QUERY PLAN + ----------------------------------------------------------- + Update on base_tbl + -> Seq Scan on base_tbl + Filter: (snoop(a, b) AND (b !~* 'private'::text)) + (3 rows) + + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b); + QUERY PLAN + ----------------------------------------------------------------- + Delete on base_tbl + -> Seq Scan on base_tbl + Filter: ((NOT snoop(a, b)) AND (b !~* 'private'::text)) + (3 rows) + + -- security barrier view prevents snooping + ALTER VIEW rw_view2 SET (security_barrier = true); + SELECT * FROM rw_view2 WHERE snoop(a,b); + NOTICE: a=3, b=Row 3 + NOTICE: a=4, b=Unspecified + NOTICE: a=0, b=Row 1 + NOTICE: a=6, b=Row 6 + a | b + ---+------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + (4 rows) + + UPDATE rw_view2 SET a=a WHERE snoop(a,b); + NOTICE: a=3, b=Row 3 + NOTICE: a=4, b=Unspecified + NOTICE: a=0, b=Row 1 + NOTICE: a=6, b=Row 6 + DELETE FROM rw_view2 WHERE NOT snoop(a,b); + NOTICE: a=3, b=Row 3 + NOTICE: a=4, b=Unspecified + NOTICE: a=0, b=Row 1 + NOTICE: a=6, b=Row 6 + EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b); + QUERY PLAN + ----------------------------------------- + Subquery Scan on rw_view2 + Filter: snoop(rw_view2.a, rw_view2.b) + -> Seq Scan on base_tbl + Filter: (b !~* 'private'::text) + (4 rows) + + EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b); + QUERY PLAN + ----------------------------------------------------- + Update on base_tbl + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.a, base_tbl.b) + -> LockRows + -> Seq Scan on base_tbl + Filter: (b !~* 'private'::text) + (6 rows) + + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b); + QUERY PLAN + ----------------------------------------------------- + Delete on base_tbl + -> Subquery Scan on base_tbl + Filter: (NOT snoop(base_tbl.a, base_tbl.b)) + -> LockRows + -> Seq Scan on base_tbl + Filter: (b !~* 'private'::text) + (6 rows) + + -- security barrier view on top of security barrier view + CREATE VIEW rw_view3 AS SELECT * FROM rw_view2 WHERE snoop(a,b); + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view3'; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view3 | YES | YES + (1 row) + + SELECT * FROM rw_view3; + NOTICE: a=3, b=Row 3 + NOTICE: a=4, b=Unspecified + NOTICE: a=0, b=Row 1 + NOTICE: a=6, b=Row 6 + a | b + ---+------------- + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + (4 rows) + + UPDATE rw_view3 SET a=a WHERE a=5; + DELETE FROM rw_view3 WHERE a=5; + EXPLAIN (costs off) SELECT * FROM rw_view3; + QUERY PLAN + ----------------------------------------- + Subquery Scan on rw_view2 + Filter: snoop(rw_view2.a, rw_view2.b) + -> Seq Scan on base_tbl + Filter: (b !~* 'private'::text) + (4 rows) + + EXPLAIN (costs off) UPDATE rw_view3 SET a=a WHERE a=5; + QUERY PLAN + -------------------------------------------------------------- + Update on base_tbl + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.a, base_tbl.b) + -> LockRows + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (a = 5) + Filter: (b !~* 'private'::text) + (7 rows) + + EXPLAIN (costs off) DELETE FROM rw_view3 WHERE a=5; + QUERY PLAN + -------------------------------------------------------------- + Delete on base_tbl + -> Subquery Scan on base_tbl + Filter: snoop(base_tbl.a, base_tbl.b) + -> LockRows + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (a = 5) + Filter: (b !~* 'private'::text) + (7 rows) + + DROP VIEW rw_view1, rw_view2, rw_view3; + -- view on top of view with rules + CREATE VIEW rw_view1 AS SELECT * FROM base_tbl OFFSET 0; -- not updatable without rules/triggers + CREATE VIEW rw_view2 AS SELECT * FROM rw_view1; + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view1 | NO | NO + rw_view2 | NO | NO + (2 rows) + + CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1 + DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a, NEW.b) RETURNING *; + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view1 | NO | YES + rw_view2 | NO | YES + (2 rows) + + CREATE RULE rw_view1_upd_rule AS ON UPDATE TO rw_view1 + DO INSTEAD UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a RETURNING NEW.*; + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view1 | NO | YES + rw_view2 | NO | YES + (2 rows) + + CREATE RULE rw_view1_del_rule AS ON DELETE TO rw_view1 + DO INSTEAD DELETE FROM base_tbl WHERE a=OLD.a RETURNING OLD.*; + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into + ------------+--------------+-------------------- + rw_view1 | YES | YES + rw_view2 | YES | YES + (2 rows) + + SELECT * FROM rw_view2; + a | b + ----+-------------- + -1 | Private data + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + (5 rows) + + INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *; + a | b + ---+------- + 7 | Row 7 + (1 row) + + UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *; + a | b + ---+----------- + 7 | Row seven + (1 row) + + SELECT * FROM rw_view2; + a | b + ----+-------------- + -1 | Private data + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + 7 | Row seven + (6 rows) + + DELETE FROM rw_view2 WHERE a=7 RETURNING *; + a | b + ---+----------- + 7 | Row seven + (1 row) + + -- view on top of view with triggers + DROP RULE rw_view1_ins_rule ON rw_view1; + DROP RULE rw_view1_upd_rule ON rw_view1; + DROP RULE rw_view1_del_rule ON rw_view1; + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into + ------------+--------------+--------------------+----------------------+----------------------+---------------------------- + rw_view1 | NO | NO | NO | NO | NO + rw_view2 | NO | NO | NO | NO | NO + (2 rows) + + CREATE FUNCTION rw_view1_trig_fn() + RETURNS trigger AS + $$ + BEGIN + IF TG_OP = 'INSERT' THEN + INSERT INTO base_tbl VALUES (NEW.a, NEW.b); + RETURN NEW; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a; + RETURN NEW; + ELSIF TG_OP = 'DELETE' THEN + DELETE FROM base_tbl WHERE a=OLD.a; + RETURN OLD; + END IF; + END; + $$ + LANGUAGE plpgsql; + CREATE TRIGGER rw_view1_ins_trig INSTEAD OF INSERT ON rw_view1 + FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn(); + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into + ------------+--------------+--------------------+----------------------+----------------------+---------------------------- + rw_view1 | NO | NO | NO | NO | YES + rw_view2 | NO | YES | NO | NO | NO + (2 rows) + + CREATE TRIGGER rw_view1_upd_trig INSTEAD OF UPDATE ON rw_view1 + FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn(); + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into + ------------+--------------+--------------------+----------------------+----------------------+---------------------------- + rw_view1 | NO | NO | YES | NO | YES + rw_view2 | NO | YES | NO | NO | NO + (2 rows) + + CREATE TRIGGER rw_view1_del_trig INSTEAD OF DELETE ON rw_view1 + FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn(); + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + table_name | is_updatable | is_insertable_into | is_trigger_updatable | is_trigger_deletable | is_trigger_insertable_into + ------------+--------------+--------------------+----------------------+----------------------+---------------------------- + rw_view1 | NO | NO | YES | YES | YES + rw_view2 | YES | YES | NO | NO | NO + (2 rows) + + SELECT * FROM rw_view2; + a | b + ----+-------------- + -1 | Private data + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + (5 rows) + + INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *; + a | b + ---+------- + 7 | Row 7 + (1 row) + + UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *; + a | b + ---+----------- + 7 | Row seven + (1 row) + + SELECT * FROM rw_view2; + a | b + ----+-------------- + -1 | Private data + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Row 6 + 7 | Row seven + (6 rows) + + DELETE FROM rw_view2 WHERE a=7 RETURNING *; + a | b + ---+----------- + 7 | Row seven + (1 row) + + DROP VIEW rw_view1, rw_view2; + -- test whole row from view + CREATE VIEW rw_view1 AS SELECT b AS bb, a AS aa FROM base_tbl; + CREATE FUNCTION rw_view1_aa(x rw_view1) + RETURNS int AS $$ SELECT x.aa $$ LANGUAGE sql; + UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6 + RETURNING rw_view1_aa(v), v.bb; + rw_view1_aa | bb + -------------+--------------- + 6 | Updated row 6 + (1 row) + + SELECT * FROM base_tbl; + a | b + ----+--------------- + -1 | Private data + 3 | Row 3 + 4 | Unspecified + 0 | Row 1 + 6 | Updated row 6 + (5 rows) + + EXPLAIN (costs off) + UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6 + RETURNING rw_view1_aa(v), v.bb; + QUERY PLAN + -------------------------------------------------- + Update on base_tbl + -> Index Scan using base_tbl_pkey on base_tbl + Index Cond: (a = 6) + (3 rows) + + DROP FUNCTION rw_view1_aa(rw_view1); + DROP VIEW rw_view1; + DROP TABLE base_tbl; + -- permissions checks + CREATE USER view_user1; + CREATE USER view_user2; + SET SESSION AUTHORIZATION view_user1; + CREATE TABLE base_tbl(a int, b text, c float); + INSERT INTO base_tbl VALUES (1, 'Row 1', 1.0); + CREATE VIEW rw_view1 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl; + INSERT INTO rw_view1 VALUES ('Row 2', 2.0, 2); + GRANT SELECT ON base_tbl TO view_user2; + GRANT SELECT ON rw_view1 TO view_user2; + GRANT UPDATE (a,c) ON base_tbl TO view_user2; + GRANT UPDATE (bb,cc) ON rw_view1 TO view_user2; + RESET SESSION AUTHORIZATION; + SET SESSION AUTHORIZATION view_user2; + CREATE VIEW rw_view2 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl; + SELECT * FROM base_tbl; -- ok + a | b | c + ---+-------+--- + 1 | Row 1 | 1 + 2 | Row 2 | 2 + (2 rows) + + SELECT * FROM rw_view1; -- ok + bb | cc | aa + -------+----+---- + Row 1 | 1 | 1 + Row 2 | 2 | 2 + (2 rows) + + SELECT * FROM rw_view2; -- ok + bb | cc | aa + -------+----+---- + Row 1 | 1 | 1 + Row 2 | 2 | 2 + (2 rows) + + INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- not allowed + ERROR: permission denied for relation base_tbl + INSERT INTO rw_view1 VALUES ('Row 3', 3.0, 3); -- not allowed + ERROR: permission denied for relation rw_view1 + INSERT INTO rw_view2 VALUES ('Row 3', 3.0, 3); -- not allowed + ERROR: permission denied for relation base_tbl + UPDATE base_tbl SET a=a, c=c; -- ok + UPDATE base_tbl SET b=b; -- not allowed + ERROR: permission denied for relation base_tbl + UPDATE rw_view1 SET bb=bb, cc=cc; -- ok + UPDATE rw_view1 SET aa=aa; -- not allowed + ERROR: permission denied for relation rw_view1 + UPDATE rw_view2 SET aa=aa, cc=cc; -- ok + UPDATE rw_view2 SET bb=bb; -- not allowed + ERROR: permission denied for relation base_tbl + DELETE FROM base_tbl; -- not allowed + ERROR: permission denied for relation base_tbl + DELETE FROM rw_view1; -- not allowed + ERROR: permission denied for relation rw_view1 + DELETE FROM rw_view2; -- not allowed + ERROR: permission denied for relation base_tbl + RESET SESSION AUTHORIZATION; + SET SESSION AUTHORIZATION view_user1; + GRANT INSERT, DELETE ON base_tbl TO view_user2; + RESET SESSION AUTHORIZATION; + SET SESSION AUTHORIZATION view_user2; + INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- ok + INSERT INTO rw_view1 VALUES ('Row 4', 4.0, 4); -- not allowed + ERROR: permission denied for relation rw_view1 + INSERT INTO rw_view2 VALUES ('Row 4', 4.0, 4); -- ok + DELETE FROM base_tbl WHERE a=1; -- ok + DELETE FROM rw_view1 WHERE aa=2; -- not allowed + ERROR: permission denied for relation rw_view1 + DELETE FROM rw_view2 WHERE aa=2; -- ok + SELECT * FROM base_tbl; + a | b | c + ---+-------+--- + 3 | Row 3 | 3 + 4 | Row 4 | 4 + (2 rows) + + RESET SESSION AUTHORIZATION; + SET SESSION AUTHORIZATION view_user1; + REVOKE INSERT, DELETE ON base_tbl FROM view_user2; + GRANT INSERT, DELETE ON rw_view1 TO view_user2; + RESET SESSION AUTHORIZATION; + SET SESSION AUTHORIZATION view_user2; + INSERT INTO base_tbl VALUES (5, 'Row 5', 5.0); -- not allowed + ERROR: permission denied for relation base_tbl + INSERT INTO rw_view1 VALUES ('Row 5', 5.0, 5); -- ok + INSERT INTO rw_view2 VALUES ('Row 6', 6.0, 6); -- not allowed + ERROR: permission denied for relation base_tbl + DELETE FROM base_tbl WHERE a=3; -- not allowed + ERROR: permission denied for relation base_tbl + DELETE FROM rw_view1 WHERE aa=3; -- ok + DELETE FROM rw_view2 WHERE aa=4; -- not allowed + ERROR: permission denied for relation base_tbl + SELECT * FROM base_tbl; + a | b | c + ---+-------+--- + 4 | Row 4 | 4 + 5 | Row 5 | 5 + (2 rows) + + RESET SESSION AUTHORIZATION; + DROP VIEW rw_view1, rw_view2; + DROP TABLE base_tbl; + DROP USER view_user1; + DROP USER view_user2; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule new file mode 100644 index ac29194..e25031e *** a/src/test/regress/parallel_schedule --- b/src/test/regress/parallel_schedule *************** test: create_index create_view *** 59,65 **** # ---------- # Another group of parallel tests # ---------- ! test: create_aggregate create_function_3 create_cast constraints triggers inherit create_table_like typed_table vacuum drop_if_exists # ---------- # sanity_check does a vacuum, affecting the sort order of SELECT * --- 59,65 ---- # ---------- # Another group of parallel tests # ---------- ! test: create_aggregate create_function_3 create_cast constraints triggers inherit create_table_like typed_table vacuum drop_if_exists updatable_views # ---------- # sanity_check does a vacuum, affecting the sort order of SELECT * diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule new file mode 100644 index 8576a7f..f84ba64 *** a/src/test/regress/serial_schedule --- b/src/test/regress/serial_schedule *************** test: create_table_like *** 67,72 **** --- 67,73 ---- test: typed_table test: vacuum test: drop_if_exists + test: updatable_views test: sanity_check test: errors test: select diff --git a/src/test/regress/sql/triggers.sql b/src/test/regress/sql/triggers.sql new file mode 100644 index f88cb81..ec579ec *** a/src/test/regress/sql/triggers.sql --- b/src/test/regress/sql/triggers.sql *************** DROP TABLE min_updates_test_oids; *** 611,623 **** CREATE VIEW main_view AS SELECT a, b FROM main_table; - -- Updates should fail without rules or triggers - INSERT INTO main_view VALUES (1,2); - UPDATE main_view SET b = 20 WHERE a = 50; - DELETE FROM main_view WHERE a = 50; - -- Should fail even when there are no matching rows - DELETE FROM main_view WHERE a = 51; - -- VIEW trigger function CREATE OR REPLACE FUNCTION view_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ --- 611,616 ---- diff --git a/src/test/regress/sql/updatable_views.sql b/src/test/regress/sql/updatable_views.sql new file mode 100644 index ...8d2b8d7 *** a/src/test/regress/sql/updatable_views.sql --- b/src/test/regress/sql/updatable_views.sql *************** *** 0 **** --- 1,335 ---- + -- + -- UPDATABLE VIEWS + -- + + CREATE TABLE base_tbl (a int PRIMARY KEY, b text DEFAULT 'Unspecified'); + INSERT INTO base_tbl VALUES (1, 'Row 1'); + INSERT INTO base_tbl VALUES (2, 'Row 2'); + + -- non-updatable views + CREATE VIEW ro_view1 AS SELECT DISTINCT a, b FROM base_tbl; -- DISTINCT not supported + CREATE VIEW ro_view2 AS SELECT a, b FROM base_tbl GROUP BY a, b; -- GROUP BY not supported + CREATE VIEW ro_view3 AS SELECT 1 FROM base_tbl HAVING max(a) > 0; -- HAVING not supported + CREATE VIEW ro_view4 AS SELECT count(*) FROM base_tbl; -- Aggregate functions not supported + CREATE VIEW ro_view5 AS SELECT a, rank() OVER() FROM base_tbl; -- Window functions not supported + CREATE VIEW ro_view6 AS SELECT a, b FROM base_tbl UNION SELECT -a, b FROM base_tbl; -- Set ops not supported + CREATE VIEW ro_view7 AS WITH t AS (SELECT a, b FROM base_tbl) SELECT * FROM t; -- WITH [RECURSIVE] not supported + CREATE VIEW ro_view8 AS SELECT a, b FROM base_tbl ORDER BY a OFFSET 1; -- OFFSET not supported + CREATE VIEW ro_view9 AS SELECT a, b FROM base_tbl ORDER BY a LIMIT 1; -- LIMIT not supported + CREATE VIEW ro_view10 AS SELECT 1 AS a; -- No base relations + CREATE VIEW ro_view11 AS SELECT b1.a, b2.b FROM base_tbl b1, base_tbl b2; -- Multiple base relations + CREATE VIEW ro_view12 AS SELECT * FROM generate_series(1, 10) AS g(a); -- SRF in rangetable + CREATE VIEW ro_view13 AS SELECT a, b FROM (SELECT * FROM base_tbl) AS t; -- Subselect in rangetable + CREATE VIEW ro_view14 AS SELECT ctid FROM base_tbl; -- System columns not supported + CREATE VIEW ro_view15 AS SELECT a, upper(b) FROM base_tbl; -- Expression/function in targetlist + CREATE VIEW ro_view16 AS SELECT a, b, a AS aa FROM base_tbl; -- Repeated column + CREATE VIEW ro_view17 AS SELECT * FROM ro_view1; -- Base relation not updatable + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'ro_view%' + ORDER BY table_name; + + DELETE FROM ro_view1; + DELETE FROM ro_view2; + DELETE FROM ro_view3; + DELETE FROM ro_view4; + DELETE FROM ro_view5; + DELETE FROM ro_view6; + UPDATE ro_view7 SET a=a+1; + UPDATE ro_view8 SET a=a+1; + UPDATE ro_view9 SET a=a+1; + UPDATE ro_view10 SET a=a+1; + UPDATE ro_view11 SET a=a+1; + UPDATE ro_view12 SET a=a+1; + INSERT INTO ro_view13 VALUES (3, 'Row 3'); + INSERT INTO ro_view14 VALUES (null); + INSERT INTO ro_view15 VALUES (3, 'ROW 3'); + INSERT INTO ro_view16 VALUES (3, 'Row 3', 3); + INSERT INTO ro_view17 VALUES (3, 'ROW 3'); + + DROP VIEW ro_view1, ro_view2, ro_view3, ro_view4, ro_view5, + ro_view6, ro_view7, ro_view8, ro_view9, ro_view10, + ro_view11, ro_view12, ro_view13, ro_view14, ro_view15, + ro_view16, ro_view17; + + -- simple updatable view + CREATE VIEW rw_view1 AS SELECT * FROM base_tbl; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view1'; + + INSERT INTO rw_view1 VALUES (3, 'Row 3'); + INSERT INTO rw_view1 (a) VALUES (4); + UPDATE rw_view1 SET a=0 WHERE a=1; + DELETE FROM rw_view1 WHERE b='Row 2'; + SELECT * FROM base_tbl; + + EXPLAIN (costs off) UPDATE rw_view1 SET a=1 WHERE a=0; + EXPLAIN (costs off) DELETE FROM rw_view1 WHERE a=0; + + -- view on top of view hiding private data + INSERT INTO base_tbl VALUES (-1, 'Private data'); + CREATE VIEW rw_view2 AS SELECT * FROM rw_view1 WHERE b !~* 'private'; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view2'; + + SELECT * FROM base_tbl; + SELECT * FROM rw_view2; + + INSERT INTO rw_view2 VALUES (5, 'Row 5'); + INSERT INTO rw_view2 (a) VALUES (6); + SELECT * FROM rw_view2; + UPDATE rw_view2 SET b='Row 6' WHERE a=6; + DELETE FROM rw_view2 WHERE a=5; + SELECT * FROM rw_view2; + + EXPLAIN (costs off) UPDATE rw_view2 SET a=1 WHERE a=0; + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE a=0; + + -- snoop on private value + CREATE FUNCTION snoop(a int, b text) + RETURNS boolean AS + $$ + BEGIN + RAISE NOTICE 'a=%, b=%', a, b; + RETURN true; + END; + $$ + LANGUAGE plpgsql COST 0.000001; + + SELECT * FROM rw_view2 WHERE snoop(a,b); + UPDATE rw_view2 SET a=a WHERE snoop(a,b); + DELETE FROM rw_view2 WHERE NOT snoop(a,b); + + EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b); + EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b); + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b); + + -- security barrier view prevents snooping + ALTER VIEW rw_view2 SET (security_barrier = true); + + SELECT * FROM rw_view2 WHERE snoop(a,b); + UPDATE rw_view2 SET a=a WHERE snoop(a,b); + DELETE FROM rw_view2 WHERE NOT snoop(a,b); + + EXPLAIN (costs off) SELECT * FROM rw_view2 WHERE snoop(a,b); + EXPLAIN (costs off) UPDATE rw_view2 SET a=a WHERE snoop(a,b); + EXPLAIN (costs off) DELETE FROM rw_view2 WHERE NOT snoop(a,b); + + -- security barrier view on top of security barrier view + CREATE VIEW rw_view3 AS SELECT * FROM rw_view2 WHERE snoop(a,b); + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name = 'rw_view3'; + + SELECT * FROM rw_view3; + UPDATE rw_view3 SET a=a WHERE a=5; + DELETE FROM rw_view3 WHERE a=5; + + EXPLAIN (costs off) SELECT * FROM rw_view3; + EXPLAIN (costs off) UPDATE rw_view3 SET a=a WHERE a=5; + EXPLAIN (costs off) DELETE FROM rw_view3 WHERE a=5; + + DROP VIEW rw_view1, rw_view2, rw_view3; + + -- view on top of view with rules + CREATE VIEW rw_view1 AS SELECT * FROM base_tbl OFFSET 0; -- not updatable without rules/triggers + CREATE VIEW rw_view2 AS SELECT * FROM rw_view1; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + CREATE RULE rw_view1_ins_rule AS ON INSERT TO rw_view1 + DO INSTEAD INSERT INTO base_tbl VALUES (NEW.a, NEW.b) RETURNING *; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + CREATE RULE rw_view1_upd_rule AS ON UPDATE TO rw_view1 + DO INSTEAD UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a RETURNING NEW.*; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + CREATE RULE rw_view1_del_rule AS ON DELETE TO rw_view1 + DO INSTEAD DELETE FROM base_tbl WHERE a=OLD.a RETURNING OLD.*; + + SELECT table_name, is_updatable, is_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + SELECT * FROM rw_view2; + INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *; + UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *; + SELECT * FROM rw_view2; + DELETE FROM rw_view2 WHERE a=7 RETURNING *; + + -- view on top of view with triggers + DROP RULE rw_view1_ins_rule ON rw_view1; + DROP RULE rw_view1_upd_rule ON rw_view1; + DROP RULE rw_view1_del_rule ON rw_view1; + + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + CREATE FUNCTION rw_view1_trig_fn() + RETURNS trigger AS + $$ + BEGIN + IF TG_OP = 'INSERT' THEN + INSERT INTO base_tbl VALUES (NEW.a, NEW.b); + RETURN NEW; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE base_tbl SET b=NEW.b WHERE a=OLD.a; + RETURN NEW; + ELSIF TG_OP = 'DELETE' THEN + DELETE FROM base_tbl WHERE a=OLD.a; + RETURN OLD; + END IF; + END; + $$ + LANGUAGE plpgsql; + + CREATE TRIGGER rw_view1_ins_trig INSTEAD OF INSERT ON rw_view1 + FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn(); + + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + CREATE TRIGGER rw_view1_upd_trig INSTEAD OF UPDATE ON rw_view1 + FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn(); + + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + CREATE TRIGGER rw_view1_del_trig INSTEAD OF DELETE ON rw_view1 + FOR EACH ROW EXECUTE PROCEDURE rw_view1_trig_fn(); + + SELECT table_name, is_updatable, is_insertable_into, + is_trigger_updatable, is_trigger_deletable, + is_trigger_insertable_into + FROM information_schema.views + WHERE table_name LIKE 'rw_view%' + ORDER BY table_name; + + SELECT * FROM rw_view2; + INSERT INTO rw_view2 VALUES (7, 'Row 7') RETURNING *; + UPDATE rw_view2 SET b='Row seven' WHERE a=7 RETURNING *; + SELECT * FROM rw_view2; + DELETE FROM rw_view2 WHERE a=7 RETURNING *; + + DROP VIEW rw_view1, rw_view2; + + -- test whole row from view + CREATE VIEW rw_view1 AS SELECT b AS bb, a AS aa FROM base_tbl; + + CREATE FUNCTION rw_view1_aa(x rw_view1) + RETURNS int AS $$ SELECT x.aa $$ LANGUAGE sql; + + UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6 + RETURNING rw_view1_aa(v), v.bb; + SELECT * FROM base_tbl; + + EXPLAIN (costs off) + UPDATE rw_view1 v SET bb='Updated row 6' WHERE rw_view1_aa(v)=6 + RETURNING rw_view1_aa(v), v.bb; + + DROP FUNCTION rw_view1_aa(rw_view1); + DROP VIEW rw_view1; + DROP TABLE base_tbl; + + -- permissions checks + CREATE USER view_user1; + CREATE USER view_user2; + + SET SESSION AUTHORIZATION view_user1; + CREATE TABLE base_tbl(a int, b text, c float); + INSERT INTO base_tbl VALUES (1, 'Row 1', 1.0); + CREATE VIEW rw_view1 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl; + INSERT INTO rw_view1 VALUES ('Row 2', 2.0, 2); + + GRANT SELECT ON base_tbl TO view_user2; + GRANT SELECT ON rw_view1 TO view_user2; + GRANT UPDATE (a,c) ON base_tbl TO view_user2; + GRANT UPDATE (bb,cc) ON rw_view1 TO view_user2; + RESET SESSION AUTHORIZATION; + + SET SESSION AUTHORIZATION view_user2; + CREATE VIEW rw_view2 AS SELECT b AS bb, c AS cc, a AS aa FROM base_tbl; + SELECT * FROM base_tbl; -- ok + SELECT * FROM rw_view1; -- ok + SELECT * FROM rw_view2; -- ok + + INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- not allowed + INSERT INTO rw_view1 VALUES ('Row 3', 3.0, 3); -- not allowed + INSERT INTO rw_view2 VALUES ('Row 3', 3.0, 3); -- not allowed + + UPDATE base_tbl SET a=a, c=c; -- ok + UPDATE base_tbl SET b=b; -- not allowed + UPDATE rw_view1 SET bb=bb, cc=cc; -- ok + UPDATE rw_view1 SET aa=aa; -- not allowed + UPDATE rw_view2 SET aa=aa, cc=cc; -- ok + UPDATE rw_view2 SET bb=bb; -- not allowed + + DELETE FROM base_tbl; -- not allowed + DELETE FROM rw_view1; -- not allowed + DELETE FROM rw_view2; -- not allowed + RESET SESSION AUTHORIZATION; + + SET SESSION AUTHORIZATION view_user1; + GRANT INSERT, DELETE ON base_tbl TO view_user2; + RESET SESSION AUTHORIZATION; + + SET SESSION AUTHORIZATION view_user2; + INSERT INTO base_tbl VALUES (3, 'Row 3', 3.0); -- ok + INSERT INTO rw_view1 VALUES ('Row 4', 4.0, 4); -- not allowed + INSERT INTO rw_view2 VALUES ('Row 4', 4.0, 4); -- ok + DELETE FROM base_tbl WHERE a=1; -- ok + DELETE FROM rw_view1 WHERE aa=2; -- not allowed + DELETE FROM rw_view2 WHERE aa=2; -- ok + SELECT * FROM base_tbl; + RESET SESSION AUTHORIZATION; + + SET SESSION AUTHORIZATION view_user1; + REVOKE INSERT, DELETE ON base_tbl FROM view_user2; + GRANT INSERT, DELETE ON rw_view1 TO view_user2; + RESET SESSION AUTHORIZATION; + + SET SESSION AUTHORIZATION view_user2; + INSERT INTO base_tbl VALUES (5, 'Row 5', 5.0); -- not allowed + INSERT INTO rw_view1 VALUES ('Row 5', 5.0, 5); -- ok + INSERT INTO rw_view2 VALUES ('Row 6', 6.0, 6); -- not allowed + DELETE FROM base_tbl WHERE a=3; -- not allowed + DELETE FROM rw_view1 WHERE aa=3; -- ok + DELETE FROM rw_view2 WHERE aa=4; -- not allowed + SELECT * FROM base_tbl; + RESET SESSION AUTHORIZATION; + + DROP VIEW rw_view1, rw_view2; + DROP TABLE base_tbl; + DROP USER view_user1; + DROP USER view_user2;