diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c index 00f2ae0..348e8e9 100644 --- a/src/backend/catalog/pg_inherits.c +++ b/src/backend/catalog/pg_inherits.c @@ -32,7 +32,121 @@ #include "utils/tqual.h" static int oid_cmp(const void *p1, const void *p2); +static List *check_rel_entries(Oid *oidarr, int numoids, LOCKMODE lockmode); +/* + * check_rel_entries + * + * Check the existence of relation entries scanned in pg_inherits and + * generate a list removing the ones that are deleted. + */ +static List * +check_rel_entries(Oid *oidarr, int numoids, LOCKMODE lockmode) +{ + int i; + List *list = NIL; + + /* + * If we found more than one element, be it parent or child, sort them + * by OID. This ensures reasonably consistent behavior regardless of + * the vagaries of an indexscan. This is important since we need to + * be sure all backends lock children in the same order to avoid needless + * deadlocks. + */ + if (numoids > 1) + qsort(oidarr, numoids, sizeof(Oid), oid_cmp); + + /* + * Acquire locks and build the result list. + */ + for (i = 0; i < numoids; i++) + { + Oid relid; + + relid = oidarr[i]; + + if (lockmode != NoLock) + { + /* Get the lock to synchronize against concurrent drop */ + LockRelationOid(relid, lockmode); + + /* + * Now that we have the lock, double-check to see if the relation + * really exists or not. If not, assume it was dropped while we + * waited to acquire lock, and ignore it. + */ + if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid))) + { + /* Release useless lock */ + UnlockRelationOid(relid, lockmode); + /* And ignore this relation */ + continue; + } + } + + list = lappend_oid(list, relid); + } + + return list; +} + +/* + * find_inheritance_parents + * + * Returns a list containing the OIDs of all the relations that are direct + * parents of the given relation. + */ +List * +find_inheritance_parents(Oid childrelid, LOCKMODE lockmode) +{ + List *list = NIL; + Relation relation; + SysScanDesc scan; + ScanKeyData key[1]; + HeapTuple inheritsTuple; + Oid *oidarr; + int maxoids, + numoids; + + /* + * Scan pg_inherits and build a working array of subclass OIDs. + */ + maxoids = 32; + oidarr = (Oid *) palloc(maxoids * sizeof(Oid)); + numoids = 0; + + relation = heap_open(InheritsRelationId, AccessShareLock); + + ScanKeyInit(&key[0], + Anum_pg_inherits_inhrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + + scan = systable_beginscan(relation, InheritsRelidIndexId, true, + NULL, 1, key); + + while ((inheritsTuple = systable_getnext(scan)) != NULL) + { + Oid parentOid = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent; + if (numoids >= maxoids) + { + maxoids *= 2; + oidarr = (Oid *) repalloc(oidarr, maxoids * sizeof(Oid)); + } + oidarr[numoids++] = parentOid; + } + + systable_endscan(scan); + + heap_close(relation, AccessShareLock); + + /* remove relations potentially already deleted */ + list = check_rel_entries(oidarr, numoids, lockmode); + + pfree(oidarr); + + return list; +} /* * find_inheritance_children @@ -56,8 +170,7 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode) Oid inhrelid; Oid *oidarr; int maxoids, - numoids, - i; + numoids; /* * Can skip the scan if pg_class shows the relation has never had a @@ -98,43 +211,8 @@ find_inheritance_children(Oid parentrelId, LOCKMODE lockmode) heap_close(relation, AccessShareLock); - /* - * If we found more than one child, sort them by OID. This ensures - * reasonably consistent behavior regardless of the vagaries of an - * indexscan. This is important since we need to be sure all backends - * lock children in the same order to avoid needless deadlocks. - */ - if (numoids > 1) - qsort(oidarr, numoids, sizeof(Oid), oid_cmp); - - /* - * Acquire locks and build the result list. - */ - for (i = 0; i < numoids; i++) - { - inhrelid = oidarr[i]; - - if (lockmode != NoLock) - { - /* Get the lock to synchronize against concurrent drop */ - LockRelationOid(inhrelid, lockmode); - - /* - * Now that we have the lock, double-check to see if the relation - * really exists or not. If not, assume it was dropped while we - * waited to acquire lock, and ignore it. - */ - if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(inhrelid))) - { - /* Release useless lock */ - UnlockRelationOid(inhrelid, lockmode); - /* And ignore this relation */ - continue; - } - } - - list = lappend_oid(list, inhrelid); - } + /* remove relations potentially already deleted */ + list = check_rel_entries(oidarr, numoids, lockmode); pfree(oidarr); diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 86e9814..cc13982 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -5180,8 +5180,8 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) HeapTuple tuple; AttrNumber attnum; Relation attr_rel; - List *indexoidlist; - ListCell *indexoidscan; + List *oidlist; + ListCell *oidscan; ObjectAddress address; /* @@ -5213,11 +5213,11 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) */ /* Loop over all indexes on the relation */ - indexoidlist = RelationGetIndexList(rel); + oidlist = RelationGetIndexList(rel); - foreach(indexoidscan, indexoidlist) + foreach(oidscan, oidlist) { - Oid indexoid = lfirst_oid(indexoidscan); + Oid indexoid = lfirst_oid(oidscan); HeapTuple indexTuple; Form_pg_index indexStruct; int i; @@ -5247,7 +5247,45 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) ReleaseSysCache(indexTuple); } - list_free(indexoidlist); + list_free(oidlist); + + /* + * Check if parent relations have a NOT NULL constraint on this column + * and throw an error if that is the case. All the direct parent + * relations found in pg_inherits are checked. + */ + oidlist = find_inheritance_parents(RelationGetRelid(rel), lockmode); + + foreach(oidscan, oidlist) + { + Oid parentoid = lfirst_oid(oidscan); + HeapTuple tuple; + Relation attr_rel; + Form_pg_attribute parentatt; + + attr_rel = heap_open(AttributeRelationId, RowExclusiveLock); + + tuple = SearchSysCacheCopyAttName(parentoid, colName); + if (!HeapTupleIsValid(tuple)) /* shouldn't happen */ + elog(ERROR, "cache lookup failed for attribute \"%s\" of relation %u", + colName, parentoid); + + parentatt = (Form_pg_attribute) GETSTRUCT(tuple); + + if (parentatt->attnotnull) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop NOT NULL constraint for attribute \"%s\"", + colName), + errdetail("The same column in parent relation \"%s\" is marked NOT NULL.", + get_rel_name(parentoid)))); + + heap_freetuple(tuple); + + heap_close(attr_rel, NoLock); + } + + list_free(oidlist); /* * Okay, actually perform the catalog change ... if needed diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h index ca5eb3d..dffa15f 100644 --- a/src/include/catalog/indexing.h +++ b/src/include/catalog/indexing.h @@ -161,6 +161,8 @@ DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index, 2680, on pg_inherits using b #define InheritsRelidSeqnoIndexId 2680 DECLARE_INDEX(pg_inherits_parent_index, 2187, on pg_inherits using btree(inhparent oid_ops)); #define InheritsParentIndexId 2187 +DECLARE_INDEX(pg_inherits_relid_index, 3343, on pg_inherits using btree(inhrelid oid_ops)); +#define InheritsRelidIndexId 3343 DECLARE_UNIQUE_INDEX(pg_init_privs_o_c_o_index, 3395, on pg_init_privs using btree(objoid oid_ops, classoid oid_ops, objsubid int4_ops)); #define InitPrivsObjIndexId 3395 diff --git a/src/include/catalog/pg_inherits_fn.h b/src/include/catalog/pg_inherits_fn.h index a717108..bb924ed 100644 --- a/src/include/catalog/pg_inherits_fn.h +++ b/src/include/catalog/pg_inherits_fn.h @@ -17,6 +17,7 @@ #include "nodes/pg_list.h" #include "storage/lock.h" +extern List *find_inheritance_parents(Oid childrelid, LOCKMODE lockmode); extern List *find_inheritance_children(Oid parentrelId, LOCKMODE lockmode); extern List *find_all_inheritors(Oid parentrelId, LOCKMODE lockmode, List **parents); diff --git a/src/test/regress/expected/alter_table.out b/src/test/regress/expected/alter_table.out index 3232cda..026233c 100644 --- a/src/test/regress/expected/alter_table.out +++ b/src/test/regress/expected/alter_table.out @@ -2914,3 +2914,15 @@ Table "public.test_add_column" c4 | integer | DROP TABLE test_add_column; +-- tests for NOT NULL with inherited tables +CREATE TABLE parent_drop_null (id int NOT NULL); +CREATE TABLE child_drop_null (id int); +ALTER TABLE child_drop_null INHERIT parent_drop_null; -- error +ERROR: column "id" in child table must be marked NOT NULL +ALTER TABLE child_drop_null ALTER COLUMN id SET NOT NULL; +ALTER TABLE child_drop_null INHERIT parent_drop_null; -- ok +ALTER TABLE child_drop_null ALTER COLUMN id DROP NOT NULL; -- error +ERROR: cannot drop NOT NULL constraint for attribute "id" +DETAIL: The same column in parent relation "parent_drop_null" is marked NOT NULL. +DROP TABLE child_drop_null; +DROP TABLE parent_drop_null; diff --git a/src/test/regress/sql/alter_table.sql b/src/test/regress/sql/alter_table.sql index 72e65d4..bd40297 100644 --- a/src/test/regress/sql/alter_table.sql +++ b/src/test/regress/sql/alter_table.sql @@ -1842,3 +1842,13 @@ ALTER TABLE test_add_column ADD COLUMN c4 integer; \d test_add_column DROP TABLE test_add_column; + +-- tests for NOT NULL with inherited tables +CREATE TABLE parent_drop_null (id int NOT NULL); +CREATE TABLE child_drop_null (id int); +ALTER TABLE child_drop_null INHERIT parent_drop_null; -- error +ALTER TABLE child_drop_null ALTER COLUMN id SET NOT NULL; +ALTER TABLE child_drop_null INHERIT parent_drop_null; -- ok +ALTER TABLE child_drop_null ALTER COLUMN id DROP NOT NULL; -- error +DROP TABLE child_drop_null; +DROP TABLE parent_drop_null;