From e0983c8cbcafbae78154af08785bb749c363650f Mon Sep 17 00:00:00 2001 From: Takamichi Osumi Date: Fri, 18 Feb 2022 14:12:14 +0000 Subject: [PATCH v22] Extend pg_stat_subscription_workers to include general transaction statistics Categorize transactions of logical replication subscriber into three types (commit, abort, error) and introduce cumulative columns of those numbers in the pg_stat_subscription_workers. In order to avoid having a large number of entries to be created by the table synchronization for many tables, the new stats columns are utilized only by the apply worker. The timing when the data of transaction statistics is sent to the stats collector is adjusted with PGSTAT_STAT_INTERVAL to avoid overload. Author: Takamichi Osumi Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow, Vignesh C, Ajin Cherian, Kyotaro Horiguchi, Tang Haiying Tested-by: Wang wei Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com --- doc/src/sgml/monitoring.sgml | 45 ++++++++++++-- src/backend/catalog/system_views.sql | 3 + src/backend/postmaster/pgstat.c | 97 +++++++++++++++++++++++++++++- src/backend/replication/logical/launcher.c | 2 + src/backend/replication/logical/worker.c | 3 + src/backend/utils/adt/pgstatfuncs.c | 25 +++++--- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 27 +++++++++ src/include/replication/worker_internal.h | 6 ++ src/test/regress/expected/rules.out | 5 +- src/tools/pgindent/typedefs.list | 1 + 11 files changed, 201 insertions(+), 19 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index bf7625d..3953492 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -629,8 +629,8 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser pg_stat_subscription_workerspg_stat_subscription_workers - One row per subscription worker, showing statistics about errors - that occurred on that subscription worker. + One row per subscription worker, showing statistics about transactions + and errors that occurred on that subscription worker. See pg_stat_subscription_workers for details. @@ -3072,10 +3072,11 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i The pg_stat_subscription_workers view will contain - one row per subscription worker on which errors have occurred, for workers - applying logical replication changes and workers handling the initial data - copy of the subscribed tables. The statistics entry is removed when the - corresponding subscription is dropped. + the statistics of the main apply worker or create one row per subscription + worker on which errors have occurred, for workers applying logical + replication changes and workers handling the initial data copy of the + subscribed tables. The statistics entry is removed when the corresponding + subscription is dropped. @@ -3123,6 +3124,38 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + commit_count bigint + + + Number of transactions successfully applied in this subscription. + Both COMMIT and COMMIT PREPARED + increment this counter. + + + + + + abort_count bigint + + + Number of transactions aborted in this subscription. + ROLLBACK PREPARED increments this counter. + + + + + + error_count bigint + + + Number of transactions that failed to be applied by the apply + worker in this subscription. This counter is updated after + confirming the error is not the same as the previous one. + + + + + last_error_relid oid diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1..e083a79 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1269,6 +1269,9 @@ CREATE VIEW pg_stat_subscription_workers AS w.subid, s.subname, w.subrelid, + w.commit_count, + w.abort_count, + w.error_count, w.last_error_relid, w.last_error_command, w.last_error_xid, diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 0646f53..13b1dae 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -55,6 +55,7 @@ #include "postmaster/postmaster.h" #include "replication/slot.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/backendid.h" #include "storage/dsm.h" #include "storage/fd.h" @@ -341,6 +342,7 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); +static void pgstat_send_subworker_xact_stats(void); static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg); static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid); static bool pgstat_should_report_connstat(void); @@ -382,6 +384,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); +static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -887,7 +890,10 @@ pgstat_report_stat(bool disconnect) * generates no WAL records can write or sync WAL data when flushing the * data pages. */ - if ((pgStatTabList == NULL || pgStatTabList->tsa_used == 0) && + if ((MyLogicalRepWorker == NULL || + (MyLogicalRepWorker->commit_count == 0 && + MyLogicalRepWorker->abort_count == 0)) && + (pgStatTabList == NULL || pgStatTabList->tsa_used == 0) && pgStatXactCommit == 0 && pgStatXactRollback == 0 && pgWalUsage.wal_records == prevWalUsage.wal_records && WalStats.m_wal_write == 0 && WalStats.m_wal_sync == 0 && @@ -986,8 +992,14 @@ pgstat_report_stat(bool disconnect) /* Send WAL statistics */ pgstat_send_wal(true); - /* Finally send SLRU statistics */ + /* Send SLRU statistics */ pgstat_send_slru(); + + /* Finally send the subworker statistics */ + if (MyLogicalRepWorker != NULL && + (MyLogicalRepWorker->commit_count > 0 || + MyLogicalRepWorker->abort_count > 0)) + pgstat_send_subworker_xact_stats(); } /* @@ -1977,6 +1989,23 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, } /* ---------- + * pgstat_report_subworker_xact_end() - + * + * Update the statistics of subscription worker and have + * pgstat_report_stat send a message to stats collector + * after count increment. + * ---------- + */ +void +pgstat_report_subworker_xact_end(bool is_commit) +{ + if (is_commit) + MyLogicalRepWorker->commit_count++; + else + MyLogicalRepWorker->abort_count++; +} + +/* ---------- * pgstat_report_subscription_drop() - * * Tell the collector about dropping the subscription. @@ -3465,6 +3494,35 @@ pgstat_send_slru(void) } } +/* ---------- + * pgstat_send_subworker_xact_stats() - + * + * Send a subworker's transaction stats to the collector. + * The statistics are cleared upon return. + * ---------- + */ +static void +pgstat_send_subworker_xact_stats(void) +{ + PgStat_MsgSubWorkerXactEnd msg; + + /* + * Prepare and send the message. + */ + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND); + msg.m_databaseid = MyDatabaseId; + msg.m_subid = MyLogicalRepWorker->subid; + msg.commit_count = MyLogicalRepWorker->commit_count; + msg.abort_count = MyLogicalRepWorker->abort_count; + pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd)); + + /* + * Clear out the statistics. + */ + MyLogicalRepWorker->commit_count = 0; + MyLogicalRepWorker->abort_count = 0; +} + /* -------- * pgstat_send_subscription_purge() - * @@ -3746,6 +3804,10 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_subworker_error(&msg.msg_subworkererror, len); break; + case PGSTAT_MTYPE_SUBWORKERXACTEND: + pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len); + break; + default: break; } @@ -3965,6 +4027,9 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid, /* If not found, initialize the new one */ if (!found) { + subwentry->commit_count = 0; + subwentry->abort_count = 0; + subwentry->error_count = 0; subwentry->last_error_relid = InvalidOid; subwentry->last_error_command = 0; subwentry->last_error_xid = InvalidTransactionId; @@ -6193,6 +6258,34 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) subwentry->last_error_time = msg->m_timestamp; strlcpy(subwentry->last_error_message, msg->m_message, PGSTAT_SUBWORKERERROR_MSGLEN); + + /* + * Only if this is a new error reported by the apply worker, increment the + * counter of error. + */ + if (!OidIsValid(msg->m_subrelid)) + subwentry->error_count++; +} + +/* ---------- + * pgstat_recv_subworker_xact_end() - + * + * Process a SUBWORKERXACTEND message. + * ---------- + */ +static void +pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *wentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + wentry = pgstat_get_subworker_entry(dbentry, msg->m_subid, + InvalidOid, true); + Assert(wentry); + + wentry->commit_count += msg->commit_count; + wentry->abort_count += msg->abort_count; } /* ---------- diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 5a68d6d..93b1b93 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -385,6 +385,8 @@ retry: TIMESTAMP_NOBEGIN(worker->last_recv_time); worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); + worker->commit_count = 0; + worker->abort_count = 0; /* Before releasing lock, remember generation for future identification. */ generation = worker->generation; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d77bb32..3977048 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -957,6 +957,7 @@ apply_handle_commit_prepared(StringInfo s) FinishPreparedTransaction(gid, true); end_replication_step(); CommitTransactionCommand(); + pgstat_report_subworker_xact_end(true); pgstat_report_stat(false); store_flush_position(prepare_data.end_lsn); @@ -1006,6 +1007,7 @@ apply_handle_rollback_prepared(StringInfo s) FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); + pgstat_report_subworker_xact_end(false); } pgstat_report_stat(false); @@ -1461,6 +1463,7 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) replorigin_session_origin_timestamp = commit_data->committime; CommitTransactionCommand(); + pgstat_report_subworker_xact_end(true); pgstat_report_stat(false); store_flush_position(commit_data->end_lsn); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 30e8dfa..2f1aea3 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2406,7 +2406,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 8 +#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 11 Oid subid = PG_GETARG_OID(0); Oid subrelid; TupleDesc tupdesc; @@ -2433,17 +2433,23 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) OIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid", OIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 3, "last_error_relid", + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "commit_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "abort_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "error_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_relid", OIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "last_error_command", + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_command", TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "last_error_xid", + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_xid", XIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "last_error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "last_error_message", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_message", TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_time", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2461,6 +2467,11 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) else nulls[i++] = true; + /* transaction stats */ + values[i++] = Int64GetDatum(wentry->commit_count); + values[i++] = Int64GetDatum(wentry->abort_count); + values[i++] = Int64GetDatum(wentry->error_count); + /* last_error_relid */ if (OidIsValid(wentry->last_error_relid)) values[i++] = ObjectIdGetDatum(wentry->last_error_relid); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 7f1ee97..fcce132 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5381,9 +5381,9 @@ proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid oid', - proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}', - proargmodes => '{i,i,o,o,o,o,o,o,o,o}', - proargnames => '{subid,subrelid,subid,subrelid,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}', + proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,oid,text,xid,int8,text,timestamptz}', + proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subrelid,subid,subrelid,commit_count,abort_count,error_count,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}', prosrc => 'pg_stat_get_subscription_worker' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index e10d202..989faf9 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -86,6 +86,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_DISCONNECT, PGSTAT_MTYPE_SUBSCRIPTIONPURGE, PGSTAT_MTYPE_SUBWORKERERROR, + PGSTAT_MTYPE_SUBWORKERXACTEND } StatMsgType; /* ---------- @@ -591,6 +592,23 @@ typedef struct PgStat_MsgSubWorkerError } PgStat_MsgSubWorkerError; /* ---------- + * PgStat_MsgSubscriptionXactEnd Sent by the apply worker to report transaction + * ends. + * ---------- + */ +typedef struct PgStat_MsgSubWorkerXactEnd +{ + PgStat_MsgHdr m_hdr; + + /* determine the worker entry */ + Oid m_databaseid; + Oid m_subid; + + PgStat_Counter commit_count; + PgStat_Counter abort_count; +} PgStat_MsgSubWorkerXactEnd; + +/* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * ---------- */ @@ -769,6 +787,7 @@ typedef union PgStat_Msg PgStat_MsgDisconnect msg_disconnect; PgStat_MsgSubscriptionPurge msg_subscriptionpurge; PgStat_MsgSubWorkerError msg_subworkererror; + PgStat_MsgSubWorkerXactEnd msg_subworkerxactend; } PgStat_Msg; @@ -1010,6 +1029,13 @@ typedef struct PgStat_StatSubWorkerEntry PgStat_StatSubWorkerKey key; /* hash key (must be first) */ /* + * Cumulative transaction statistics of subscription worker + */ + PgStat_Counter commit_count; + PgStat_Counter abort_count; + PgStat_Counter error_count; + + /* * Subscription worker error statistics representing an error that * occurred during application of changes or the initial table * synchronization. @@ -1134,6 +1160,7 @@ extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, LogicalRepMsgType command, TransactionId xid, const char *errmsg); +extern void pgstat_report_subworker_xact_end(bool is_commit); extern void pgstat_report_subscription_drop(Oid subid); extern void pgstat_initialize(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 3c3f5f6..2a3e02d 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -66,6 +66,12 @@ typedef struct LogicalRepWorker TimestampTz last_recv_time; XLogRecPtr reply_lsn; TimestampTz reply_time; + + /* + * Transaction statistics of subscription worker + */ + int64 commit_count; + int64 abort_count; } LogicalRepWorker; /* Main memory context for apply worker. Permanent during worker lifetime. */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1420288..f459919 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2075,6 +2075,9 @@ pg_stat_subscription| SELECT su.oid AS subid, pg_stat_subscription_workers| SELECT w.subid, s.subname, w.subrelid, + w.commit_count, + w.abort_count, + w.error_count, w.last_error_relid, w.last_error_command, w.last_error_xid, @@ -2088,7 +2091,7 @@ pg_stat_subscription_workers| SELECT w.subid, SELECT pg_subscription_rel.srsubid AS subid, pg_subscription_rel.srrelid AS relid FROM pg_subscription_rel) sr, - (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time) + (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, commit_count, abort_count, error_count, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time) JOIN pg_subscription s ON ((w.subid = s.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 15684f5..04166e0 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1946,6 +1946,7 @@ PgStat_MsgResetslrucounter PgStat_MsgSLRU PgStat_MsgSubscriptionPurge PgStat_MsgSubWorkerError +PgStat_MsgSubWorkerXactEnd PgStat_MsgTabpurge PgStat_MsgTabstat PgStat_MsgTempFile -- 1.8.3.1