From c88a90e42d6e1e33d7194183779887da81d2291f Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 11 Oct 2022 09:15:28 -0400 Subject: [PATCH v27 3/4] Support CREATE TABLE AS SELECT INTO The main idea of replicating the CREATE TABLE AS is that we deprase the CREATE TABLE AS into a simple CREATE TABLE(without subquery) command and WAL log it after creating the table and before writing data into the table and replicate the incoming writes later as normal INSERTs. In this apporach, we don't execute the subquery on subscriber so that don't need to make sure all the objects referenced in the subquery also exists in subscriber. And This approach works for all kind of commands(e.g. CRAETE TABLE AS [SELECT][EXECUTE][VALUES]) Introducing a new type of event trigger "table_init_write". which would be fired for CREATE TABLE AS/SELECT INTO after creating the table and before any other modification. we deparse the command in the table_init_write event trigger and WAL log the deparsed json string. The walsender will send the string to subscriber. And incoming INSERTs will also be replicated. --- src/backend/commands/createas.c | 10 ++ src/backend/commands/ddl_deparse.c | 23 +++++ src/backend/commands/event_trigger.c | 172 ++++++++++++++++++++++++++++++++- src/backend/commands/publicationcmds.c | 9 ++ src/backend/tcop/utility.c | 2 + src/backend/utils/cache/evtcache.c | 2 + src/include/catalog/pg_proc.dat | 3 + src/include/commands/event_trigger.h | 4 + src/include/tcop/deparse_utility.h | 9 +- src/include/utils/evtcache.h | 3 +- 10 files changed, 231 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 152c29b..0d80c6c 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -34,6 +34,7 @@ #include "catalog/namespace.h" #include "catalog/toasting.h" #include "commands/createas.h" +#include "commands/event_trigger.h" #include "commands/matview.h" #include "commands/prepare.h" #include "commands/tablecmds.h" @@ -143,6 +144,15 @@ create_ctas_internal(List *attrList, IntoClause *into) StoreViewQuery(intoRelationAddr.objectId, query, false); CommandCounterIncrement(); } + else + { + /* + * Fire the trigger for table_init_write after creating the table so + * that we can access the catalog info about the newly created table in + * the trigger function. + */ + EventTriggerTableInitWrite((Node *) create, intoRelationAddr); + } return intoRelationAddr; } diff --git a/src/backend/commands/ddl_deparse.c b/src/backend/commands/ddl_deparse.c index 87a2a48..6e47f61 100755 --- a/src/backend/commands/ddl_deparse.c +++ b/src/backend/commands/ddl_deparse.c @@ -8121,6 +8121,26 @@ deparse_CreateTransformStmt(Oid objectId, Node *parsetree) return createTransform; } +/* + * Deparse CREATE TABLE AS command. + * + * deparse_CreateStmt do the actual work as we deparse the final CreateStmt for + * CREATE TABLE AS command. + */ +static ObjTree * +deparse_CreateTableAsStmt(CollectedCommand *cmd) +{ + Oid objectId; + Node *parsetree; + + Assert(cmd->type == SCT_CreateTableAs); + + parsetree = cmd->d.ctas.real_create; + objectId = cmd->d.ctas.address.objectId; + + return deparse_CreateStmt(objectId, parsetree); +} + /* * Handle deparsing of simple commands. @@ -8423,6 +8443,9 @@ deparse_utility_command(CollectedCommand *cmd, bool verbose_mode) case SCT_Grant: tree = deparse_GrantStmt(cmd); break; + case SCT_CreateTableAs: + tree = deparse_CreateTableAsStmt(cmd); + break; case SCT_AlterOpFamily: tree = deparse_AlterOpFamily(cmd); break; diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 1819ca2..4d9ed62 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -133,7 +133,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) if (strcmp(stmt->eventname, "ddl_command_start") != 0 && strcmp(stmt->eventname, "ddl_command_end") != 0 && strcmp(stmt->eventname, "sql_drop") != 0 && - strcmp(stmt->eventname, "table_rewrite") != 0) + strcmp(stmt->eventname, "table_rewrite") != 0 && + strcmp(stmt->eventname, "table_init_write") != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("unrecognized event name \"%s\"", @@ -159,7 +160,8 @@ CreateEventTrigger(CreateEventTrigStmt *stmt) /* Validate tag list, if any. */ if ((strcmp(stmt->eventname, "ddl_command_start") == 0 || strcmp(stmt->eventname, "ddl_command_end") == 0 || - strcmp(stmt->eventname, "sql_drop") == 0) + strcmp(stmt->eventname, "sql_drop") == 0 || + strcmp(stmt->eventname, "table_init_write") == 0) && tags != NULL) validate_ddl_tags("tag", tags); else if (strcmp(stmt->eventname, "table_rewrite") == 0 @@ -585,7 +587,8 @@ EventTriggerCommonSetup(Node *parsetree, dbgtag = CreateCommandTag(parsetree); if (event == EVT_DDLCommandStart || event == EVT_DDLCommandEnd || - event == EVT_SQLDrop) + event == EVT_SQLDrop || + event == EVT_TableInitWrite) { if (!command_tag_event_trigger_ok(dbgtag)) elog(ERROR, "unexpected command tag \"%s\"", GetCommandTagName(dbgtag)); @@ -868,6 +871,163 @@ EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason) CommandCounterIncrement(); } + +/* + * EventTriggerTableInitWriteStart + * Prepare to receive data on an CREATE TABLE AS/SELET INTO command about + * to be executed. + */ +void +EventTriggerTableInitWriteStart(Node *parsetree) +{ + MemoryContext oldcxt; + CollectedCommand *command; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + oldcxt = MemoryContextSwitchTo(currentEventTriggerState->cxt); + + command = palloc(sizeof(CollectedCommand)); + + command->type = SCT_CreateTableAs; + command->in_extension = creating_extension; + command->d.ctas.address = InvalidObjectAddress; + command->d.ctas.real_create = NULL; + command->parsetree = copyObject(parsetree); + + command->parent = currentEventTriggerState->currentCommand; + currentEventTriggerState->currentCommand = command; + + MemoryContextSwitchTo(oldcxt); +} + +/* + * EventTriggerTableInitWriteEnd + * Finish up saving an CREATE TABLE AS/SELECT INTO command. + * + * FIXME this API isn't considering the possibility that an xact/subxact is + * aborted partway through. Probably it's best to add an + * AtEOSubXact_EventTriggers() to fix this. + */ +void +EventTriggerTableInitWriteEnd(void) +{ + CollectedCommand *parent; + + /* ignore if event trigger context not set, or collection disabled */ + if (!currentEventTriggerState || + currentEventTriggerState->commandCollectionInhibited) + return; + + parent = currentEventTriggerState->currentCommand->parent; + + pfree(currentEventTriggerState->currentCommand); + + currentEventTriggerState->currentCommand = parent; +} + +/* + * publication_deparse_table_init_write + * + * Deparse the ddl table create command and log it. + */ +Datum +publication_deparse_table_init_write(PG_FUNCTION_ARGS) +{ + char relpersist; + CollectedCommand *cmd; + char *json_string; + + if (!CALLED_AS_EVENT_TRIGGER(fcinfo)) + elog(ERROR, "not fired by event trigger manager"); + + cmd = currentEventTriggerState->currentCommand; + Assert(cmd); + + relpersist = get_rel_persistence(cmd->d.simple.address.objectId); + + /* + * Do not generate wal log for commands whose target table is a temporary + * table. + * + * We will generate wal logs for unlogged tables so that unlogged tables + * can also be created and altered on the subscriber side. This makes it + * possible to directly replay the SET LOGGED command and the incoming + * rewrite message without creating a new table. + */ + if (relpersist == RELPERSISTENCE_TEMP) + return PointerGetDatum(NULL); + + /* Deparse the DDL command and WAL log it to allow decoding of the same. */ + json_string = deparse_utility_command(cmd, false); + + if (json_string != NULL) + LogLogicalDDLMessage("deparse", cmd->d.simple.address.objectId, DCT_SimpleCmd, + json_string, strlen(json_string) + 1); + + return PointerGetDatum(NULL); +} + +/* + * Fire table_init_rewrite triggers. + */ +void +EventTriggerTableInitWrite(Node *real_create, ObjectAddress address) +{ + List *runlist; + EventTriggerData trigdata; + CollectedCommand *command; + + /* + * See EventTriggerDDLCommandStart for a discussion about why event + * triggers are disabled in single user mode. + */ + if (!IsUnderPostmaster) + return; + + /* + * Also do nothing if our state isn't set up, which it won't be if there + * weren't any relevant event triggers at the start of the current DDL + * command. This test might therefore seem optional, but it's + * *necessary*, because EventTriggerCommonSetup might find triggers that + * didn't exist at the time the command started. + */ + if (!currentEventTriggerState) + return; + + /* Do nothing if no command was collected. */ + if (!currentEventTriggerState->currentCommand) + return; + + command = currentEventTriggerState->currentCommand; + + runlist = EventTriggerCommonSetup(command->parsetree, + EVT_TableInitWrite, + "table_init_write", + &trigdata); + if (runlist == NIL) + return; + + /* Set the real CreateTable statment and object address */ + command->d.ctas.real_create = real_create; + command->d.ctas.address = address; + + /* Run the triggers. */ + EventTriggerInvoke(runlist, &trigdata); + + /* Cleanup. */ + list_free(runlist); + + /* + * Make sure anything the event triggers did will be visible to the main + * command. + */ + CommandCounterIncrement(); +} + /* * Invoke each event trigger in a list of event triggers. */ @@ -1149,7 +1309,8 @@ trackDroppedObjectsNeeded(void) */ return (EventCacheLookup(EVT_SQLDrop) != NIL) || (EventCacheLookup(EVT_TableRewrite) != NIL) || - (EventCacheLookup(EVT_DDLCommandEnd) != NIL); + (EventCacheLookup(EVT_DDLCommandEnd) != NIL) || + (EventCacheLookup(EVT_TableInitWrite) != NIL); } /* @@ -1868,6 +2029,7 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) case SCT_AlterOpFamily: case SCT_CreateOpClass: case SCT_AlterTSConfig: + case SCT_CreateTableAs: { char *identity; char *type; @@ -1885,6 +2047,8 @@ pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS) addr = cmd->d.createopc.address; else if (cmd->type == SCT_AlterTSConfig) addr = cmd->d.atscfg.address; + else if (cmd->type == SCT_AlterTSConfig) + addr = cmd->d.ctas.address; /* * If an object was dropped in the same command we may end diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 1ba9ef4..c8dbf5b 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -1006,6 +1006,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) CMDTAG_REFRESH_MATERIALIZED_VIEW }; + CommandTag init_commands[] = { + CMDTAG_CREATE_TABLE_AS, + CMDTAG_SELECT_INTO + }; + /* Create the ddl_command_end event trigger */ CreateDDLReplicaEventTrigger("ddl_command_end", end_commands, lengthof(end_commands), myself, puboid); @@ -1017,6 +1022,10 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Create the table_rewrite event trigger */ CreateDDLReplicaEventTrigger("table_rewrite", rewrite_commands, lengthof(rewrite_commands), myself, puboid); + + /* Create the table_init_write event trigger */ + CreateDDLReplicaEventTrigger("table_init_write", init_commands, + lengthof(init_commands), myself, puboid); } table_close(rel, RowExclusiveLock); diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index bea35a6..50839c0 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -1666,8 +1666,10 @@ ProcessUtilitySlow(ParseState *pstate, break; case T_CreateTableAsStmt: + EventTriggerTableInitWriteStart(parsetree); address = ExecCreateTableAs(pstate, (CreateTableAsStmt *) parsetree, params, queryEnv, qc); + EventTriggerTableInitWriteEnd(); break; case T_RefreshMatViewStmt: diff --git a/src/backend/utils/cache/evtcache.c b/src/backend/utils/cache/evtcache.c index f7f7165..7fb8cb2 100644 --- a/src/backend/utils/cache/evtcache.c +++ b/src/backend/utils/cache/evtcache.c @@ -167,6 +167,8 @@ BuildEventTriggerCache(void) event = EVT_SQLDrop; else if (strcmp(evtevent, "table_rewrite") == 0) event = EVT_TableRewrite; + else if (strcmp(evtevent, "table_init_write") == 0) + event = EVT_TableInitWrite; else continue; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index ce1b17f..81fbfd4 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11838,4 +11838,7 @@ { oid => '4646', descr => 'trigger for ddl command deparse table rewrite', proname => 'publication_deparse_table_rewrite', prorettype => 'event_trigger', proargtypes => '', prosrc => 'publication_deparse_table_rewrite' }, +{ oid => '4647', descr => 'trigger for ddl command deparse table init', + proname => 'publication_deparse_table_init_write', prorettype => 'event_trigger', + proargtypes => '', prosrc => 'publication_deparse_table_init_write' }, ] diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h index fd2ee7f..a9e0f71 100644 --- a/src/include/commands/event_trigger.h +++ b/src/include/commands/event_trigger.h @@ -55,6 +55,10 @@ extern void EventTriggerDDLCommandEnd(Node *parsetree); extern void EventTriggerSQLDrop(Node *parsetree); extern void EventTriggerTableRewrite(Node *parsetree, Oid tableOid, int reason); +extern void EventTriggerTableInitWriteStart(Node *parsetree); +extern void EventTriggerTableInitWrite(Node *parsetree, ObjectAddress address); +extern void EventTriggerTableInitWriteEnd(void); + extern bool EventTriggerBeginCompleteQuery(void); extern void EventTriggerEndCompleteQuery(void); extern bool trackDroppedObjectsNeeded(void); diff --git a/src/include/tcop/deparse_utility.h b/src/include/tcop/deparse_utility.h index b53294b..3d294a0 100644 --- a/src/include/tcop/deparse_utility.h +++ b/src/include/tcop/deparse_utility.h @@ -29,7 +29,8 @@ typedef enum CollectedCommandType SCT_AlterOpFamily, SCT_AlterDefaultPrivileges, SCT_CreateOpClass, - SCT_AlterTSConfig + SCT_AlterTSConfig, + SCT_CreateTableAs } CollectedCommandType; /* @@ -101,6 +102,12 @@ typedef struct CollectedCommand { ObjectType objtype; } defprivs; + + struct + { + ObjectAddress address; + Node *real_create; + } ctas; } d; struct CollectedCommand *parent; /* when nested */ diff --git a/src/include/utils/evtcache.h b/src/include/utils/evtcache.h index ddb67a6..1e64831 100644 --- a/src/include/utils/evtcache.h +++ b/src/include/utils/evtcache.h @@ -22,7 +22,8 @@ typedef enum EVT_DDLCommandStart, EVT_DDLCommandEnd, EVT_SQLDrop, - EVT_TableRewrite + EVT_TableRewrite, + EVT_TableInitWrite } EventTriggerEvent; typedef struct -- 1.8.3.1