From b5c08d34cb1ba1275d7a175cc82d86d2fef759c9 Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Tue, 11 Jun 2019 15:35:11 -0400 Subject: [PATCH 1/7] First pass at working code without subscription options --- src/backend/replication/logical/proto.c | 108 ++++++++++------- src/backend/replication/logical/worker.c | 124 ++++++++++++++------ src/backend/replication/pgoutput/pgoutput.c | 34 +++++- src/include/replication/logicalproto.h | 20 ++-- src/include/replication/pgoutput.h | 1 + 5 files changed, 201 insertions(+), 86 deletions(-) diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index e7df47de3e..8859aa8e79 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -16,9 +16,11 @@ #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" #include "libpq/pqformat.h" +#include "replication/logicalrelation.h" #include "replication/logicalproto.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/rel.h" #include "utils/syscache.h" /* @@ -31,7 +33,7 @@ static void logicalrep_write_attrs(StringInfo out, Relation rel); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple); + HeapTuple tuple, bool binary_basetypes); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -139,7 +141,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) * Write INSERT to the output stream. */ void -logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) +logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary_basetypes) { pq_sendbyte(out, 'I'); /* action INSERT */ @@ -151,7 +153,7 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary_basetypes); } /* @@ -159,14 +161,10 @@ logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple) * * Fills the new tuple. */ -LogicalRepRelId +void logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) { char action; - LogicalRepRelId relid; - - /* read the relation id */ - relid = pq_getmsgint(in, 4); action = pq_getmsgbyte(in); if (action != 'N') @@ -175,7 +173,6 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) logicalrep_read_tuple(in, newtup); - return relid; } /* @@ -183,7 +180,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple) + HeapTuple newtuple, bool binary_basetypes) { pq_sendbyte(out, 'U'); /* action UPDATE */ @@ -200,26 +197,22 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple); + logicalrep_write_tuple(out, rel, newtuple, binary_basetypes); } /* * Read UPDATE from stream. */ -LogicalRepRelId +void logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup) { char action; - LogicalRepRelId relid; - - /* read the relation id */ - relid = pq_getmsgint(in, 4); /* read and verify action */ action = pq_getmsgbyte(in); @@ -245,14 +238,13 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple, logicalrep_read_tuple(in, newtup); - return relid; } /* * Write DELETE to the output stream. */ void -logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) +logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary_basetypes) { Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT || rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL || @@ -268,7 +260,7 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple); + logicalrep_write_tuple(out, rel, oldtuple, binary_basetypes); } /* @@ -276,14 +268,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple) * * Fills the old tuple. */ -LogicalRepRelId +void logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) { char action; - LogicalRepRelId relid; - - /* read the relation id */ - relid = pq_getmsgint(in, 4); /* read and verify action */ action = pq_getmsgbyte(in); @@ -292,7 +280,6 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup) logicalrep_read_tuple(in, oldtup); - return relid; } /* @@ -441,7 +428,7 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary_basetypes) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; @@ -457,6 +444,7 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) continue; nliveatts++; } + pq_sendint16(out, nliveatts); /* try to allocate enough memory from the get-go */ @@ -492,12 +480,31 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple) elog(ERROR, "cache lookup failed for type %u", att->atttypid); typclass = (Form_pg_type) GETSTRUCT(typtup); - pq_sendbyte(out, 't'); /* 'text' data follows */ - outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); - pq_sendcountedtext(out, outputstr, strlen(outputstr), false); - pfree(outputstr); + if (binary_basetypes && + OidIsValid(typclass->typreceive) && + (att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') && + (att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid)) + { + bytea *outputbytes; + int len; + pq_sendbyte(out, 'b'); /* binary send/recv data follows */ + + outputbytes = OidSendFunctionCall(typclass->typsend, + values[i]); + len = VARSIZE(outputbytes) - VARHDRSZ; + pq_sendint(out, len, 4); /* length */ + pq_sendbytes(out, VARDATA(outputbytes), len); /* data */ + pfree(outputbytes); + } + else + { + pq_sendbyte(out, 't'); /* 'text' data follows */ + outputstr = OidOutputFunctionCall(typclass->typoutput, values[i]); + pq_sendcountedtext(out, outputstr, strlen(outputstr), false); + pfree(outputstr); + } ReleaseSysCache(typtup); } } @@ -518,6 +525,9 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) memset(tuple->changed, 0, sizeof(tuple->changed)); + /* default is text */ + memset(tuple->binary, 0, sizeof(tuple->binary)); + /* Read the data */ for (i = 0; i < natts; i++) { @@ -528,25 +538,43 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) switch (kind) { case 'n': /* null */ - tuple->values[i] = NULL; + tuple->values[i].len = 0; + tuple->values[i].data = NULL; tuple->changed[i] = true; break; case 'u': /* unchanged column */ /* we don't receive the value of an unchanged column */ - tuple->values[i] = NULL; + tuple->values[i].len = 0; + tuple->values[i].data = NULL; break; - case 't': /* text formatted value */ + case 'b': /* binary formatted value */ { - int len; - tuple->changed[i] = true; + tuple->binary[i] = true; + + int len = pq_getmsgint(in, 4); /* read length */ - len = pq_getmsgint(in, 4); /* read length */ + tuple->values[i].data = palloc(len + 1); + /* and data */ + + pq_copymsgbytes(in, tuple->values[i].data, len); + tuple->values[i].len = len; + tuple->values[i].cursor = 0; + tuple->values[i].maxlen = len; + /* not strictly necessary but the docs say it is required */ + tuple->values[i].data[len] = '\0'; + break; + } + case 't': /* text formatted value */ + { + tuple->changed[i] = true; + int len = pq_getmsgint(in, 4); /* read length */ /* and data */ - tuple->values[i] = palloc(len + 1); - pq_copymsgbytes(in, tuple->values[i], len); - tuple->values[i][len] = '\0'; + tuple->values[i].data = palloc(len + 1); + pq_copymsgbytes(in, tuple->values[i].data, len); + tuple->values[i].data[len] = '\0'; + tuple->values[i].len = len; } break; default: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ced0d191c2..9a70649134 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -296,13 +296,12 @@ slot_store_error_callback(void *arg) } /* - * Store data in C string form into slot. - * This is similar to BuildTupleFromCStrings but TupleTableSlot fits our - * use better. + * Store data into slot. + * Data can be either text or binary transfer format */ static void -slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, - char **values) +slot_store_data(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -327,18 +326,40 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, int remoteattnum = rel->attrmap[i]; if (!att->attisdropped && remoteattnum >= 0 && - values[remoteattnum] != NULL) + tupleData->values[remoteattnum].data != NULL) { - Oid typinput; - Oid typioparam; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); + if (tupleData->binary[remoteattnum]) + { + Oid typreceive; + Oid typioparam; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + + int cursor = tupleData->values[remoteattnum].cursor; + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum], + typioparam, att->atttypmod); + /* + * Do not advance the cursor in case we need to re-read this + * This saves us from pushing all of this type logic into proto.c + */ + tupleData->values[remoteattnum].cursor = cursor; + + } + else + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data, + typioparam, att->atttypmod); + } slot->tts_isnull[i] = false; errarg.local_attnum = -1; @@ -372,11 +393,17 @@ slot_store_cstrings(TupleTableSlot *slot, LogicalRepRelMapEntry *rel, * Caution: unreplaced pass-by-ref columns in "slot" will point into the * storage for "srcslot". This is OK for current usage, but someday we may * need to materialize "slot" at the end to make it independent of "srcslot". + * + * TODO: figure out the right comment here + * Modify slot with user data provided. + * This is somewhat similar to heap_modify_tuple but also calls the type + * input function on the user data as the input is either text or binary transfer + * format */ static void -slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, - LogicalRepRelMapEntry *rel, - char **values, bool *replaces) +slot_modify_data(TupleTableSlot *slot, TupleTableSlot *srcslot, + LogicalRepRelMapEntry *rel, + LogicalRepTupleData *tupleData) { int natts = slot->tts_tupleDescriptor->natts; int i; @@ -413,21 +440,43 @@ slot_modify_cstrings(TupleTableSlot *slot, TupleTableSlot *srcslot, if (remoteattnum < 0) continue; - if (!replaces[remoteattnum]) + if (!tupleData->changed[remoteattnum]) continue; - if (values[remoteattnum] != NULL) + if (tupleData->values[remoteattnum].data != NULL) { - Oid typinput; - Oid typioparam; errarg.local_attnum = i; errarg.remote_attnum = remoteattnum; - getTypeInputInfo(att->atttypid, &typinput, &typioparam); - slot->tts_values[i] = - OidInputFunctionCall(typinput, values[remoteattnum], - typioparam, att->atttypmod); + if (tupleData->binary[remoteattnum]) + { + Oid typreceive; + Oid typioparam; + + getTypeBinaryInputInfo(att->atttypid, &typreceive, &typioparam); + + + int cursor = tupleData->values[remoteattnum].cursor; + slot->tts_values[i] = + OidReceiveFunctionCall(typreceive, &tupleData->values[remoteattnum], + typioparam, att->atttypmod); + /* + * Do not advance the cursor in case we need to re-read this + * This saves us from pushing all of this type logic into proto.c + */ + tupleData->values[remoteattnum].cursor = cursor; + } + else + { + Oid typinput; + Oid typioparam; + + getTypeInputInfo(att->atttypid, &typinput, &typioparam); + slot->tts_values[i] = + OidInputFunctionCall(typinput, tupleData->values[remoteattnum].data, + typioparam, att->atttypmod); + } slot->tts_isnull[i] = false; errarg.local_attnum = -1; @@ -592,8 +641,12 @@ apply_handle_insert(StringInfo s) ensure_transaction(); - relid = logicalrep_read_insert(s, &newtup); + /* read the relation id */ + relid = pq_getmsgint(s, 4); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + logicalrep_read_insert(s, &newtup); + if (!should_apply_changes_for_rel(rel)) { /* @@ -615,7 +668,7 @@ apply_handle_insert(StringInfo s) /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, newtup.values); + slot_store_data(remoteslot, rel, &newtup); slot_fill_defaults(rel, estate, remoteslot); MemoryContextSwitchTo(oldctx); @@ -695,9 +748,12 @@ apply_handle_update(StringInfo s) ensure_transaction(); - relid = logicalrep_read_update(s, &has_oldtup, &oldtup, - &newtup); + /* read the relation id */ + relid = pq_getmsgint(s, 4); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + logicalrep_read_update(s, &has_oldtup, &oldtup, + &newtup); if (!should_apply_changes_for_rel(rel)) { /* @@ -725,8 +781,8 @@ apply_handle_update(StringInfo s) /* Build the search tuple. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, - has_oldtup ? oldtup.values : newtup.values); + slot_store_data(remoteslot, rel, + has_oldtup ? &oldtup : &newtup); MemoryContextSwitchTo(oldctx); /* @@ -756,8 +812,7 @@ apply_handle_update(StringInfo s) { /* Process and store remote tuple in the slot */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_modify_cstrings(remoteslot, localslot, rel, - newtup.values, newtup.changed); + slot_modify_data(remoteslot, localslot, rel, &newtup); MemoryContextSwitchTo(oldctx); EvalPlanQualSetSlot(&epqstate, remoteslot); @@ -815,8 +870,11 @@ apply_handle_delete(StringInfo s) ensure_transaction(); - relid = logicalrep_read_delete(s, &oldtup); + /* read the relation id */ + relid = pq_getmsgint(s, 4); rel = logicalrep_rel_open(relid, RowExclusiveLock); + + logicalrep_read_delete(s, &oldtup); if (!should_apply_changes_for_rel(rel)) { /* @@ -844,7 +902,7 @@ apply_handle_delete(StringInfo s) /* Find the tuple using the replica identity index. */ oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - slot_store_cstrings(remoteslot, rel, oldtup.values); + slot_store_data(remoteslot, rel, &oldtup); MemoryContextSwitchTo(oldctx); /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3483c1b877..2cb8624e9e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -87,11 +87,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, - List **publication_names) + List **publication_names, bool *binary_basetypes) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; + bool binary_option_given = false; + + // default to false + *binary_basetypes = false; foreach(lc, options) { @@ -137,6 +141,22 @@ parse_output_parameters(List *options, uint32 *protocol_version, (errcode(ERRCODE_INVALID_NAME), errmsg("invalid publication_names syntax"))); } + else if (strcmp(defel->defname, "binary") == 0 ) + { + bool parsed; + if (binary_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + binary_option_given = true; + + if (!parse_bool(strVal(defel->arg), &parsed)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid binary option"))); + + *binary_basetypes = parsed; + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -171,7 +191,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Parse the params and ERROR if we see any we don't recognize */ parse_output_parameters(ctx->output_plugin_options, &data->protocol_version, - &data->publication_names); + &data->publication_names, + &data->binary_basetypes); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM) @@ -343,7 +364,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INSERT: OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, relation, - &change->data.tp.newtuple->tuple); + &change->data.tp.newtuple->tuple, + data->binary_basetypes); OutputPluginWrite(ctx, true); break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -353,7 +375,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, relation, oldtuple, - &change->data.tp.newtuple->tuple); + &change->data.tp.newtuple->tuple, + data->binary_basetypes); OutputPluginWrite(ctx, true); break; } @@ -362,7 +385,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { OutputPluginPrepareWrite(ctx, true); logicalrep_write_delete(ctx->out, relation, - &change->data.tp.oldtuple->tuple); + &change->data.tp.oldtuple->tuple, + data->binary_basetypes); OutputPluginWrite(ctx, true); } else diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 3fc430af01..4d1b7e8c4f 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -30,8 +30,12 @@ /* Tuple coming via logical replication. */ typedef struct LogicalRepTupleData { - /* column values in text format, or NULL for a null value: */ - char *values[MaxTupleAttributeNumber]; + /* column values */ + StringInfoData values[MaxTupleAttributeNumber]; + + /* markers for binary */ + bool binary[MaxTupleAttributeNumber]; + /* markers for changed/unchanged column values: */ bool changed[MaxTupleAttributeNumber]; } LogicalRepTupleData; @@ -86,16 +90,16 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, XLogRecPtr origin_lsn); extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, Relation rel, - HeapTuple newtuple); -extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); + HeapTuple newtuple, bool binary_basetypes); +extern void logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple); -extern LogicalRepRelId logicalrep_read_update(StringInfo in, + HeapTuple newtuple, bool binary_basetypes); +extern void logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); extern void logicalrep_write_delete(StringInfo out, Relation rel, - HeapTuple oldtuple); -extern LogicalRepRelId logicalrep_read_delete(StringInfo in, + HeapTuple oldtuple, bool binary_basetypes); +extern void logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup); extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[], bool cascade, bool restart_seqs); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 8870721bcd..933038f385 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -25,6 +25,7 @@ typedef struct PGOutputData List *publication_names; List *publications; + bool binary_basetypes; } PGOutputData; #endif /* PGOUTPUT_H */ -- 2.20.1 (Apple Git-117)