diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 44a5985..8d3bc7d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2588,6 +2588,39 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i Amount of decoded transaction data spilled to disk. + + + + stream_txns bigint + + + Number of in-progress transactions streamed to subscriber after + memory used by logical decoding exceeds logical_decoding_work_mem. + Streaming only works with toplevel transactions (subtransactions can't + be streamed independently), so the counter does not get incremented for + subtransactions. + + + + + + stream_count bigint + + + Number of times in-progress transactions were streamed to subscriber. + Transactions may get streamed repeatedly, and this counter gets incremented + on every such invocation. + + + + + + stream_bytes bigint + + + Amount of decoded in-progress transaction data streamed to subscriber. + + diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 4ab14eb..042278e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -795,7 +795,10 @@ CREATE VIEW pg_stat_replication_slots AS s.name, s.spill_txns, s.spill_count, - s.spill_bytes + s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes FROM pg_stat_get_replication_slots() AS s; CREATE VIEW pg_stat_slru AS diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 12d6c59..0a6c452 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1643,7 +1643,7 @@ pgstat_report_tempfile(size_t filesize) */ void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes) + int spillbytes, int streamtxns, int streamcount, int streambytes) { PgStat_MsgReplSlot msg; @@ -1656,6 +1656,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, msg.m_spill_txns = spilltxns; msg.m_spill_count = spillcount; msg.m_spill_bytes = spillbytes; + msg.m_stream_txns = streamtxns; + msg.m_stream_count = streamcount; + msg.m_stream_bytes = streambytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -6674,6 +6677,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) replSlotStats[idx].spill_txns += msg->m_spill_txns; replSlotStats[idx].spill_count += msg->m_spill_count; replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + replSlotStats[idx].stream_txns += msg->m_stream_txns; + replSlotStats[idx].stream_count += msg->m_stream_count; + replSlotStats[idx].stream_bytes += msg->m_stream_bytes; } } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2b216a3..d510068 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1469,12 +1469,19 @@ UpdateSpillStats(LogicalDecodingContext *ctx) rb, (long long) rb->spillTxns, (long long) rb->spillCount, - (long long) rb->spillBytes); + (long long) rb->spillBytes, + (long long) rb->streamTxns, + (long long) rb->streamCount, + (long long) rb->streamBytes); pgstat_report_replslot(NameStr(ctx->slot->data.name), - rb->spillTxns, rb->spillCount, rb->spillBytes); + rb->spillTxns, rb->spillCount, rb->spillBytes, + rb->streamTxns, rb->streamCount, rb->streamBytes); rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; + rb->streamTxns = 0; + rb->streamCount = 0; + rb->streamBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4ea0356..ac4422b 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -347,6 +347,9 @@ ReorderBufferAllocate(void) buffer->spillCount = 0; buffer->spillTxns = 0; buffer->spillBytes = 0; + buffer->streamCount = 0; + buffer->streamTxns = 0; + buffer->streamBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -3496,10 +3499,18 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->snapshot_now = NULL; } + + rb->streamCount += 1; + rb->streamBytes += txn->total_size; + + /* Don't consider already streamed transaction. */ + rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1; + /* Process and send the changes to output plugin. */ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); + Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); Assert(txn->nentries_mem == 0); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index a677365..0d50e35 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -324,7 +324,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, ConditionVariableBroadcast(&slot->active_cv); /* Create statistics entry for the new slot */ - pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0); + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0); } /* diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 7cb186e..a26a503 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2096,7 +2096,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_CLOS 4 +#define PG_STAT_GET_REPLICATION_SLOT_CLOS 7 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2144,6 +2144,9 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) values[1] = Int64GetDatum(stat.spill_txns); values[2] = Int64GetDatum(stat.spill_count); values[3] = Int64GetDatum(stat.spill_bytes); + values[4] = Int64GetDatum(stat.stream_txns); + values[5] = Int64GetDatum(stat.stream_count); + values[6] = Int64GetDatum(stat.stream_bytes); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index a3d94af..b538962 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5255,9 +5255,9 @@ proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8}', - proargmodes => '{o,o,o,o}', - proargnames => '{name,spill_txns,spill_count,spill_bytes}', + proallargtypes => '{text,int8,int8,int8,int8,int8,int8}', + proargmodes => '{o,o,o,o,o,o,o}', + proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes}', prosrc => 'pg_stat_get_replication_slots' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index cdb9a65..c07cdd6 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -467,6 +467,9 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_spill_txns; PgStat_Counter m_spill_count; PgStat_Counter m_spill_bytes; + PgStat_Counter m_stream_txns; + PgStat_Counter m_stream_count; + PgStat_Counter m_stream_bytes; } PgStat_MsgReplSlot; @@ -787,6 +790,9 @@ typedef struct PgStat_ReplSlotStats PgStat_Counter spill_txns; PgStat_Counter spill_count; PgStat_Counter spill_bytes; + PgStat_Counter stream_txns; + PgStat_Counter stream_count; + PgStat_Counter stream_bytes; } PgStat_ReplSlotStats; /* ---------- @@ -1344,7 +1350,7 @@ extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes); + int spillbytes, int streamtxns, int streamcount, int streambytes); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index fba950c..edc51b1 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -536,6 +536,9 @@ struct ReorderBuffer int64 spillCount; /* spill-to-disk invocation counter */ int64 spillTxns; /* number of transactions spilled to disk */ int64 spillBytes; /* amount of data spilled to disk */ + int64 streamCount; /* streaming invocation counter */ + int64 streamTxns; /* number of transactions spilled to disk */ + int64 streamBytes; /* amount of data streamed to subscriber */ }; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 5353f24..197a86c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2011,8 +2011,11 @@ pg_stat_replication| SELECT s.pid, pg_stat_replication_slots| SELECT s.name, s.spill_txns, s.spill_count, - s.spill_bytes - FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes); + s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes + FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit,