]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Update replication statistics after every stream/spill.
authorAmit Kapila <akapila@postgresql.org>
Thu, 6 May 2021 05:51:26 +0000 (11:21 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 6 May 2021 05:51:26 +0000 (11:21 +0530)
Currently, replication slot statistics are updated at prepare, commit, and
rollback. Now, if the transaction is interrupted the stats might not get
updated. Fixed this by updating replication statistics after every
stream/spill.

In passing update the docs to change the description of some of the slot
stats.

Author: Vignesh C, Sawada Masahiko
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/20210319185247.ldebgpdaxsowiflw@alap3.anarazel.de

doc/src/sgml/monitoring.sgml
src/backend/replication/logical/decode.c
src/backend/replication/logical/reorderbuffer.c
src/include/replication/reorderbuffer.h

index 886e626be802edd9df87424a1fb56714377a6a2e..370cdc2e1a11ef4f1eefd2c17881bae97248e6dd 100644 (file)
@@ -2708,10 +2708,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
         <structfield>stream_bytes</structfield><type>bigint</type>
        </para>
        <para>
-        Amount of decoded in-progress transaction data streamed to the decoding
-        output plugin while decoding changes from WAL for this slot. This and other
-        streaming counters for this slot can be used to gauge the network I/O which
-        occurred during logical decoding and allow tuning <literal>logical_decoding_work_mem</literal>.
+        Amount of transaction data decoded for streaming in-progress
+        transactions to the decoding output plugin while decoding changes from
+        WAL for this slot. This and other streaming counters for this slot can
+        be used to tune <literal>logical_decoding_work_mem</literal>.
        </para>
       </entry>
      </row>
@@ -2733,10 +2733,9 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
         <structfield>total_bytes</structfield><type>bigint</type>
        </para>
        <para>
-        Amount of decoded transaction data sent to the decoding output plugin
-        while decoding the changes from WAL for this slot. This can be used to
-        gauge the total amount of data sent during logical decoding. Note that
-        this includes data that is streamed and/or spilled.
+        Amount of transaction data decoded for sending transactions to the
+        decoding output plugin while decoding changes from WAL for this slot.
+        Note that this includes data that is streamed and/or spilled.
        </para>
       </entry>
      </row>
index 7924581cdcd08ce7d7403f1846a28ce8470d8ac4..888e064ec0f074b2259c96ca1b6a16f39449b1c9 100644 (file)
@@ -746,9 +746,10 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
        }
 
        /*
-        * Update the decoding stats at transaction prepare/commit/abort. It is
-        * not clear that sending more or less frequently than this would be
-        * better.
+        * Update the decoding stats at transaction prepare/commit/abort.
+        * Additionally we send the stats when we spill or stream the changes to
+        * avoid losing them in case the decoding is interrupted. It is not clear
+        * that sending more or less frequently than this would be better.
         */
        UpdateDecodingStats(ctx);
 }
@@ -828,9 +829,10 @@ DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
        ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
 
        /*
-        * Update the decoding stats at transaction prepare/commit/abort. It is
-        * not clear that sending more or less frequently than this would be
-        * better.
+        * Update the decoding stats at transaction prepare/commit/abort.
+        * Additionally we send the stats when we spill or stream the changes to
+        * avoid losing them in case the decoding is interrupted. It is not clear
+        * that sending more or less frequently than this would be better.
         */
        UpdateDecodingStats(ctx);
 }
index c79425fbb73887606e0e7afa4b8d46f582873f2e..e80a195472e6290f56a10857174b07afaffec078 100644 (file)
@@ -3559,6 +3559,9 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
                /* don't consider already serialized transactions */
                rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+
+               /* update the decoding stats */
+               UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
        }
 
        Assert(spilled == txn->nentries_mem);
@@ -3928,6 +3931,9 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        /* Don't consider already streamed transaction. */
        rb->streamTxns += (txn_is_streamed) ? 0 : 1;
 
+       /* update the decoding stats */
+       UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
+
        Assert(dlist_is_empty(&txn->changes));
        Assert(txn->nentries == 0);
        Assert(txn->nentries_mem == 0);
index bfab8303ee773a0f8accdb209559379bc4818c8c..53cdfa5d88f90ca22bb703c53fca77addaa2c85c 100644 (file)
@@ -617,14 +617,14 @@ struct ReorderBuffer
        /* Statistics about transactions streamed to the decoding output plugin */
        int64           streamTxns;             /* number of transactions streamed */
        int64           streamCount;    /* streaming invocation counter */
-       int64           streamBytes;    /* amount of data streamed */
+       int64           streamBytes;    /* amount of data decoded */
 
        /*
         * Statistics about all the transactions sent to the decoding output
         * plugin
         */
        int64           totalTxns;              /* total number of transactions sent */
-       int64           totalBytes;             /* total amount of data sent */
+       int64           totalBytes;             /* total amount of data decoded */
 };