diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 44a5985..8d3bc7d 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2588,6 +2588,39 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
Amount of decoded transaction data spilled to disk.
+
+
+
+ stream_txns bigint
+
+
+ Number of in-progress transactions streamed to subscriber after
+ memory used by logical decoding exceeds logical_decoding_work_mem.
+ Streaming only works with toplevel transactions (subtransactions can't
+ be streamed independently), so the counter does not get incremented for
+ subtransactions.
+
+
+
+
+
+ stream_count bigint
+
+
+ Number of times in-progress transactions were streamed to subscriber.
+ Transactions may get streamed repeatedly, and this counter gets incremented
+ on every such invocation.
+
+
+
+
+
+ stream_bytes bigint
+
+
+ Amount of decoded in-progress transaction data streamed to subscriber.
+
+
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 4ab14eb..042278e 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -795,7 +795,10 @@ CREATE VIEW pg_stat_replication_slots AS
s.name,
s.spill_txns,
s.spill_count,
- s.spill_bytes
+ s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes
FROM pg_stat_get_replication_slots() AS s;
CREATE VIEW pg_stat_slru AS
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 12d6c59..0a6c452 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -1643,7 +1643,7 @@ pgstat_report_tempfile(size_t filesize)
*/
void
pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes)
+ int spillbytes, int streamtxns, int streamcount, int streambytes)
{
PgStat_MsgReplSlot msg;
@@ -1656,6 +1656,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
msg.m_spill_txns = spilltxns;
msg.m_spill_count = spillcount;
msg.m_spill_bytes = spillbytes;
+ msg.m_stream_txns = streamtxns;
+ msg.m_stream_count = streamcount;
+ msg.m_stream_bytes = streambytes;
pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
}
@@ -6674,6 +6677,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
replSlotStats[idx].spill_txns += msg->m_spill_txns;
replSlotStats[idx].spill_count += msg->m_spill_count;
replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ replSlotStats[idx].stream_txns += msg->m_stream_txns;
+ replSlotStats[idx].stream_count += msg->m_stream_count;
+ replSlotStats[idx].stream_bytes += msg->m_stream_bytes;
}
}
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2b216a3..d510068 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1469,12 +1469,19 @@ UpdateSpillStats(LogicalDecodingContext *ctx)
rb,
(long long) rb->spillTxns,
(long long) rb->spillCount,
- (long long) rb->spillBytes);
+ (long long) rb->spillBytes,
+ (long long) rb->streamTxns,
+ (long long) rb->streamCount,
+ (long long) rb->streamBytes);
pgstat_report_replslot(NameStr(ctx->slot->data.name),
- rb->spillTxns, rb->spillCount, rb->spillBytes);
+ rb->spillTxns, rb->spillCount, rb->spillBytes,
+ rb->streamTxns, rb->streamCount, rb->streamBytes);
rb->spillTxns = 0;
rb->spillCount = 0;
rb->spillBytes = 0;
+ rb->streamTxns = 0;
+ rb->streamCount = 0;
+ rb->streamBytes = 0;
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 4ea0356..ac4422b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -347,6 +347,9 @@ ReorderBufferAllocate(void)
buffer->spillCount = 0;
buffer->spillTxns = 0;
buffer->spillBytes = 0;
+ buffer->streamCount = 0;
+ buffer->streamTxns = 0;
+ buffer->streamBytes = 0;
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
@@ -3496,10 +3499,18 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
txn->snapshot_now = NULL;
}
+
+ rb->streamCount += 1;
+ rb->streamBytes += txn->total_size;
+
+ /* Don't consider already streamed transaction. */
+ rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1;
+
/* Process and send the changes to output plugin. */
ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
command_id, true);
+
Assert(dlist_is_empty(&txn->changes));
Assert(txn->nentries == 0);
Assert(txn->nentries_mem == 0);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a677365..0d50e35 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -324,7 +324,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
ConditionVariableBroadcast(&slot->active_cv);
/* Create statistics entry for the new slot */
- pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
}
/*
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 7cb186e..a26a503 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2096,7 +2096,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
Datum
pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_REPLICATION_SLOT_CLOS 4
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 7
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -2144,6 +2144,9 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
values[1] = Int64GetDatum(stat.spill_txns);
values[2] = Int64GetDatum(stat.spill_count);
values[3] = Int64GetDatum(stat.spill_bytes);
+ values[4] = Int64GetDatum(stat.stream_txns);
+ values[5] = Int64GetDatum(stat.stream_count);
+ values[6] = Int64GetDatum(stat.stream_bytes);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a3d94af..b538962 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5255,9 +5255,9 @@
proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', proparallel => 'r',
prorettype => 'record', proargtypes => '',
- proallargtypes => '{text,int8,int8,int8}',
- proargmodes => '{o,o,o,o}',
- proargnames => '{name,spill_txns,spill_count,spill_bytes}',
+ proallargtypes => '{text,int8,int8,int8,int8,int8,int8}',
+ proargmodes => '{o,o,o,o,o,o,o}',
+ proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes}',
prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index cdb9a65..c07cdd6 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -467,6 +467,9 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_spill_txns;
PgStat_Counter m_spill_count;
PgStat_Counter m_spill_bytes;
+ PgStat_Counter m_stream_txns;
+ PgStat_Counter m_stream_count;
+ PgStat_Counter m_stream_bytes;
} PgStat_MsgReplSlot;
@@ -787,6 +790,9 @@ typedef struct PgStat_ReplSlotStats
PgStat_Counter spill_txns;
PgStat_Counter spill_count;
PgStat_Counter spill_bytes;
+ PgStat_Counter stream_txns;
+ PgStat_Counter stream_count;
+ PgStat_Counter stream_bytes;
} PgStat_ReplSlotStats;
/* ----------
@@ -1344,7 +1350,7 @@ extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
- int spillbytes);
+ int spillbytes, int streamtxns, int streamcount, int streambytes);
extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fba950c..edc51b1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -536,6 +536,9 @@ struct ReorderBuffer
int64 spillCount; /* spill-to-disk invocation counter */
int64 spillTxns; /* number of transactions spilled to disk */
int64 spillBytes; /* amount of data spilled to disk */
+ int64 streamCount; /* streaming invocation counter */
+ int64 streamTxns; /* number of transactions spilled to disk */
+ int64 streamBytes; /* amount of data streamed to subscriber */
};
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 5353f24..197a86c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2011,8 +2011,11 @@ pg_stat_replication| SELECT s.pid,
pg_stat_replication_slots| SELECT s.name,
s.spill_txns,
s.spill_count,
- s.spill_bytes
- FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes);
+ s.spill_bytes,
+ s.stream_txns,
+ s.stream_count,
+ s.stream_bytes
+ FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,