]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Add mem_exceeded_count column to pg_stat_replication_slots.
authorMasahiko Sawada <msawada@postgresql.org>
Wed, 8 Oct 2025 17:05:04 +0000 (10:05 -0700)
committerMasahiko Sawada <msawada@postgresql.org>
Wed, 8 Oct 2025 17:05:04 +0000 (10:05 -0700)
This commit introduces a new column mem_exceeded_count to the
pg_stat_replication_slots view. This counter tracks how often the
memory used by logical decoding exceeds the logical_decoding_work_mem
limit. The new statistic helps users determine whether exceeding the
logical_decoding_work_mem limit is a rare occurrences or a frequent
issue, information that wasn't available through existing statistics.

Bumps catversion.

Author: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/978D21E8-9D3B-40EA-A4B1-F87BABE7868C@yesql.se

13 files changed:
contrib/test_decoding/expected/stats.out
contrib/test_decoding/sql/stats.sql
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/backend/utils/activity/pgstat_replslot.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/include/replication/reorderbuffer.h
src/test/regress/expected/rules.out

index de6dc416130a0ac6ca2f6f2387fc634c4bbe4609..28da9123cc8b992ebaeca7ff1f481a14e84672bb 100644 (file)
@@ -37,12 +37,12 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | t          | t
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
+------------------------+------------+-------------+------------+-------------+--------------------
+ regression_slot_stats1 | t          | t           | t          | t           | t
+ regression_slot_stats2 | t          | t           | t          | t           | t
+ regression_slot_stats3 | t          | t           | t          | t           | t
 (3 rows)
 
 RESET logical_decoding_work_mem;
@@ -53,12 +53,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | t          | t
- regression_slot_stats3 | t          | t           | t          | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
+------------------------+------------+-------------+------------+-------------+--------------------
+ regression_slot_stats1 | t          | t           | f          | f           | t
+ regression_slot_stats2 | t          | t           | t          | t           | t
+ regression_slot_stats3 | t          | t           | t          | t           | t
 (3 rows)
 
 -- reset stats for all slots
@@ -68,27 +68,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes 
-------------------------+------------+-------------+------------+-------------
- regression_slot_stats1 | t          | t           | f          | f
- regression_slot_stats2 | t          | t           | f          | f
- regression_slot_stats3 | t          | t           | f          | f
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
+------------------------+------------+-------------+------------+-------------+--------------------
+ regression_slot_stats1 | t          | t           | f          | f           | t
+ regression_slot_stats2 | t          | t           | f          | f           | t
+ regression_slot_stats3 | t          | t           | f          | f           | t
 (3 rows)
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | total_txns | total_bytes | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+------------+-------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |          0 |           0 | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 | 
 (1 row)
 
 -- spilling the xact
@@ -110,12 +110,12 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
-       slot_name        | spill_txns | spill_count 
-------------------------+------------+-------------
- regression_slot_stats1 | t          | t
- regression_slot_stats2 | f          | f
- regression_slot_stats3 | f          | f
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, mem_exceeded_count > 0 AS mem_exceeded_count FROM pg_stat_replication_slots;
+       slot_name        | spill_txns | spill_count | mem_exceeded_count 
+------------------------+------------+-------------+--------------------
+ regression_slot_stats1 | t          | t           | t
+ regression_slot_stats2 | f          | f           | f
+ regression_slot_stats3 | f          | f           | f
 (3 rows)
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
@@ -159,16 +159,19 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophas
 (1 row)
 
 -- Verify that the decoding doesn't spill already-aborted transaction's changes.
+-- Given that there is no concurrent activities that are capturable by logical decoding,
+-- mem_exceeded_count should theoretically be 1 but we check if >0 here since it's
+-- more flexible for potential future changes and adequate for the testing purpose.
 SELECT pg_stat_force_next_flush();
  pg_stat_force_next_flush 
 --------------------------
  
 (1 row)
 
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
-            slot_name            | spill_txns | spill_count 
----------------------------------+------------+-------------
- regression_slot_stats4_twophase |          0 |           0
+SELECT slot_name, spill_txns, spill_count, mem_exceeded_count > 0 as mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+            slot_name            | spill_txns | spill_count | mem_exceeded_count 
+---------------------------------+------------+-------------+--------------------
+ regression_slot_stats4_twophase |          0 |           0 | t
 (1 row)
 
 DROP TABLE stats_test;
index a022fe1bf0750eeae928b38e63fbb54095dbcae9..6661dbcb85c3a89a543caddd44b852722fcff379 100644 (file)
@@ -15,16 +15,16 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 RESET logical_decoding_work_mem;
 
 -- reset stats for one slot, others should be unaffected
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- reset stats for all slots
 SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
@@ -41,7 +41,7 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot_stats1', NULL
 -- background transaction (say by autovacuum) happens in parallel to the main
 -- transaction.
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots;
+SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, mem_exceeded_count > 0 AS mem_exceeded_count FROM pg_stat_replication_slots;
 
 -- Ensure stats can be repeatedly accessed using the same stats snapshot. See
 -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de
@@ -64,8 +64,11 @@ ROLLBACK PREPARED 'test1_abort';
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats4_twophase', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- Verify that the decoding doesn't spill already-aborted transaction's changes.
+-- Given that there is no concurrent activities that are capturable by logical decoding,
+-- mem_exceeded_count should theoretically be 1 but we check if >0 here since it's
+-- more flexible for potential future changes and adequate for the testing purpose.
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
+SELECT slot_name, spill_txns, spill_count, mem_exceeded_count > 0 as mem_exceeded_count FROM pg_stat_replication_slots WHERE slot_name = 'regression_slot_stats4_twophase';
 
 DROP TABLE stats_test;
 SELECT pg_drop_replication_slot('regression_slot_stats1'),
index 789ac16b444c4a4261bf1d1dc7d6e46b5bfdb39b..dc4fc29466d94e0656c91185ea58460a541a2c34 100644 (file)
@@ -1620,6 +1620,17 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>mem_exceeded_count</structfield><type>bigint</type>
+       </para>
+       <para>
+        Number of times the memory used by logical decoding has exceeded
+        <literal>logical_decoding_work_mem</literal>.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>total_txns</structfield> <type>bigint</type>
index c94f1f05f523d013407050161cc6cf6932937bd6..fcc86fd43be237b14297768047dc0902c8b241d5 100644 (file)
@@ -1063,6 +1063,7 @@ CREATE VIEW pg_stat_replication_slots AS
             s.stream_txns,
             s.stream_count,
             s.stream_bytes,
+            s.mem_exceeded_count,
             s.total_txns,
             s.total_bytes,
             s.stats_reset
index c68c0481f427ac77591a82fac2aa66ca2faa2607..93ed2eb368e1702f583d09599e4f4fc767a3f41e 100644 (file)
@@ -1955,10 +1955,11 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
        PgStat_StatReplSlotEntry repSlotStat;
 
        /* Nothing to do if we don't have any replication stats to be sent. */
-       if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
+       if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 &&
+               rb->memExceededCount <= 0)
                return;
 
-       elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+       elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
                 rb,
                 rb->spillTxns,
                 rb->spillCount,
@@ -1966,6 +1967,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
                 rb->streamTxns,
                 rb->streamCount,
                 rb->streamBytes,
+                rb->memExceededCount,
                 rb->totalTxns,
                 rb->totalBytes);
 
@@ -1975,6 +1977,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
        repSlotStat.stream_txns = rb->streamTxns;
        repSlotStat.stream_count = rb->streamCount;
        repSlotStat.stream_bytes = rb->streamBytes;
+       repSlotStat.mem_exceeded_count = rb->memExceededCount;
        repSlotStat.total_txns = rb->totalTxns;
        repSlotStat.total_bytes = rb->totalBytes;
 
@@ -1986,6 +1989,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
        rb->streamTxns = 0;
        rb->streamCount = 0;
        rb->streamBytes = 0;
+       rb->memExceededCount = 0;
        rb->totalTxns = 0;
        rb->totalBytes = 0;
 }
index a5e165fb123c83cf39330587de02b7680ebeeafa..a54434151debfb41d42f040ba4167527ee467d9e 100644 (file)
@@ -390,6 +390,7 @@ ReorderBufferAllocate(void)
        buffer->streamTxns = 0;
        buffer->streamCount = 0;
        buffer->streamBytes = 0;
+       buffer->memExceededCount = 0;
        buffer->totalTxns = 0;
        buffer->totalBytes = 0;
 
@@ -3898,14 +3899,26 @@ static void
 ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 {
        ReorderBufferTXN *txn;
+       bool            update_stats = true;
 
-       /*
-        * Bail out if debug_logical_replication_streaming is buffered and we
-        * haven't exceeded the memory limit.
-        */
-       if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED &&
-               rb->size < logical_decoding_work_mem * (Size) 1024)
+       if (rb->size >= logical_decoding_work_mem * (Size) 1024)
+       {
+               /*
+                * Update the statistics as the memory usage has reached the limit. We
+                * report the statistics update later in this function since we can
+                * update the slot statistics altogether while streaming or
+                * serializing transactions in most cases.
+                */
+               rb->memExceededCount += 1;
+       }
+       else if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED)
+       {
+               /*
+                * Bail out if debug_logical_replication_streaming is buffered and we
+                * haven't exceeded the memory limit.
+                */
                return;
+       }
 
        /*
         * If debug_logical_replication_streaming is immediate, loop until there's
@@ -3965,8 +3978,17 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
                 */
                Assert(txn->size == 0);
                Assert(txn->nentries_mem == 0);
+
+               /*
+                * We've reported the memExceededCount update while streaming or
+                * serializing the transaction.
+                */
+               update_stats = false;
        }
 
+       if (update_stats)
+               UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
+
        /* We must be under the memory limit now. */
        Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
 }
index ccfb11c49bf82793180ca1f031ab54d1eb1ba2e3..d210c261ac65f53fefe2fd5925c63a20a90316da 100644 (file)
@@ -94,6 +94,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
        REPLSLOT_ACC(stream_txns);
        REPLSLOT_ACC(stream_count);
        REPLSLOT_ACC(stream_bytes);
+       REPLSLOT_ACC(mem_exceeded_count);
        REPLSLOT_ACC(total_txns);
        REPLSLOT_ACC(total_bytes);
 #undef REPLSLOT_ACC
index 3802a4cb88817d0e02f18fef996027213b70ff54..1fe33df2756ec87d99ebf124e576f943aa9aa912 100644 (file)
@@ -2121,7 +2121,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 10
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 11
        text       *slotname_text = PG_GETARG_TEXT_P(0);
        NameData        slotname;
        TupleDesc       tupdesc;
@@ -2146,11 +2146,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
                                           INT8OID, -1, 0);
        TupleDescInitEntry(tupdesc, (AttrNumber) 7, "stream_bytes",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 8, "total_txns",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 8, "mem_exceeded_count",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_bytes",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 9, "total_txns",
                                           INT8OID, -1, 0);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
+       TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
+                                          INT8OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
                                           TIMESTAMPTZOID, -1, 0);
        BlessTupleDesc(tupdesc);
 
@@ -2173,13 +2175,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
        values[4] = Int64GetDatum(slotent->stream_txns);
        values[5] = Int64GetDatum(slotent->stream_count);
        values[6] = Int64GetDatum(slotent->stream_bytes);
-       values[7] = Int64GetDatum(slotent->total_txns);
-       values[8] = Int64GetDatum(slotent->total_bytes);
+       values[7] = Int64GetDatum(slotent->mem_exceeded_count);
+       values[8] = Int64GetDatum(slotent->total_txns);
+       values[9] = Int64GetDatum(slotent->total_bytes);
 
        if (slotent->stat_reset_timestamp == 0)
-               nulls[9] = true;
+               nulls[10] = true;
        else
-               values[9] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+               values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
        /* Returns the record as Datum */
        PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
index af54211f330f17a25ad266b8e6aff52ae4d664a9..a98c6d6d820610339c925cba6aa5187442995fd8 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202510082
+#define CATALOG_VERSION_NO     202510083
 
 #endif
index 6bb31892d1d862dff55ea79d1d8c2d20dddb1e80..25687eaecea54c9dfbe831c0b5aef2b96b2815f3 100644 (file)
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
index d24bf864a227fea55fce8b524d7069a9058246a0..bc8077cbae6364f4fdf42023a22e226c70b95fd5 100644 (file)
@@ -395,6 +395,7 @@ typedef struct PgStat_StatReplSlotEntry
        PgStat_Counter stream_txns;
        PgStat_Counter stream_count;
        PgStat_Counter stream_bytes;
+       PgStat_Counter mem_exceeded_count;
        PgStat_Counter total_txns;
        PgStat_Counter total_bytes;
        TimestampTz stat_reset_timestamp;
index 91dc7e5e448d8fd1f7cac3c5adb659167dba5b91..3cbe106a3c78654a5ea99c4e90346590c3e6788f 100644 (file)
@@ -690,6 +690,9 @@ struct ReorderBuffer
        int64           streamCount;    /* streaming invocation counter */
        int64           streamBytes;    /* amount of data decoded */
 
+       /* Number of times the logical_decoding_work_mem limit has been reached */
+       int64           memExceededCount;
+
        /*
         * Statistics about all the transactions sent to the decoding output
         * plugin
index 8859a5a885f46576ae8b8b63ca0384dd92cb5f73..d67af144d69f08abed7859a3edd72045c1b6069b 100644 (file)
@@ -2140,11 +2140,12 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.stream_txns,
     s.stream_count,
     s.stream_bytes,
+    s.mem_exceeded_count,
     s.total_txns,
     s.total_bytes,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,