]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Add support for more progress reporting in COPY
authorMichael Paquier <michael@paquier.xyz>
Tue, 9 Mar 2021 05:21:03 +0000 (14:21 +0900)
committerMichael Paquier <michael@paquier.xyz>
Tue, 9 Mar 2021 05:21:03 +0000 (14:21 +0900)
The command (TO or FROM), its type (file, pipe, program or callback),
and the number of tuples excluded by a WHERE clause in COPY FROM are
added to the progress reporting already available.

The column "lines_processed" is renamed to "tuples_processed" to
disambiguate the meaning of this column in the cases of CSV and BINARY
COPY and to be more consistent with the other catalog progress views.

Bump catalog version, again.

Author: Matthias van de Meent
Reviewed-by: Michael Paquier, Justin Pryzby, Bharath Rupireddy, Josef
Šimánek, Tomas Vondra
Discussion: https://postgr.es/m/CAEze2WiOcgdH4aQA8NtZq-4dgvnJzp8PohdeKchPkhMY-jWZXA@mail.gmail.com

doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/commands/copyfrom.c
src/backend/commands/copyto.c
src/include/catalog/catversion.h
src/include/commands/progress.h
src/test/regress/expected/rules.out

index 53692c0020b380df6069e9e7097ddeb2fcb79425..51f733840416214a587f24584d59fbaa5b57f07b 100644 (file)
@@ -6531,8 +6531,33 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
        <structfield>relid</structfield> <type>oid</type>
       </para>
       <para>
-       OID of the table on which the <command>COPY</command> command is executed.
-       It is set to 0 if copying from a <command>SELECT</command> query.
+       OID of the table on which the <command>COPY</command> command is
+       executed. It is set to <literal>0</literal> if copying from a
+       <command>SELECT</command> query.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>command</structfield> <type>text</type>
+      </para>
+      <para>
+       The command that is running: <literal>COPY FROM</literal>, or
+       <literal>COPY TO</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>type</structfield> <type>text</type>
+      </para>
+      <para>
+       The io type that the data is read from or written to:
+       <literal>FILE</literal>, <literal>PROGRAM</literal>,
+       <literal>PIPE</literal> (for <command>COPY FROM STDIN</command> and
+       <command>COPY TO STDOUT</command>), or <literal>CALLBACK</literal>
+       (used for example during the initial table synchronization in
+       logical replication).
       </para></entry>
      </row>
 
@@ -6551,16 +6576,26 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
       </para>
       <para>
        Size of source file for <command>COPY FROM</command> command in bytes.
-       It is set to 0 if not available.
+       It is set to <literal>0</literal> if not available.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>tuples_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of tuples already processed by <command>COPY</command> command.
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>lines_processed</structfield> <type>bigint</type>
+       <structfield>tuples_excluded</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of lines already processed by <command>COPY</command> command.
+       Number of tuples not processed because they were excluded by the
+       <command>WHERE</command> clause of the <command>COPY</command> command.
       </para></entry>
      </row>
     </tbody>
index fb1116d09ad25f2cb5820524a761660494c34eb5..15221be67d7e9c202b37216e5f4debd1a3e015da 100644 (file)
@@ -1129,9 +1129,18 @@ CREATE VIEW pg_stat_progress_copy AS
     SELECT
         S.pid AS pid, S.datid AS datid, D.datname AS datname,
         S.relid AS relid,
+        CASE S.param5 WHEN 1 THEN 'COPY FROM'
+                      WHEN 2 THEN 'COPY TO'
+                      END AS command,
+        CASE S.param6 WHEN 1 THEN 'FILE'
+                      WHEN 2 THEN 'PROGRAM'
+                      WHEN 3 THEN 'PIPE'
+                      WHEN 4 THEN 'CALLBACK'
+                      END AS "type",
         S.param1 AS bytes_processed,
         S.param2 AS bytes_total,
-        S.param3 AS lines_processed
+        S.param3 AS tuples_processed,
+        S.param4 AS tuples_excluded
     FROM pg_stat_get_progress_info('COPY') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
index f05e2d23476d3784e256a4a394f13c80b95f604f..2ed696d429b2eb931008d50b43812a7850989f4c 100644 (file)
@@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate)
        BulkInsertState bistate = NULL;
        CopyInsertMethod insertMethod;
        CopyMultiInsertInfo multiInsertInfo = {0};      /* pacify compiler */
-       uint64          processed = 0;
+       int64           processed = 0;
+       int64           excluded = 0;
        bool            has_before_insert_row_trig;
        bool            has_instead_insert_row_trig;
        bool            leafpart_use_multi_insert = false;
@@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate)
                        econtext->ecxt_scantuple = myslot;
                        /* Skip items that don't match COPY's WHERE clause */
                        if (!ExecQual(cstate->qualexpr, econtext))
+                       {
+                               /*
+                                * Report that this tuple was filtered out by the WHERE
+                                * clause.
+                                */
+                               pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
+                                                                                        ++excluded);
                                continue;
+                       }
                }
 
                /* Determine the partition to insert the tuple into */
@@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate)
                        /*
                         * We count only tuples not suppressed by a BEFORE INSERT trigger
                         * or FDW; this is the same definition used by nodeModifyTable.c
-                        * for counting tuples inserted by an INSERT command. Update
+                        * for counting tuples inserted by an INSERT command.  Update
                         * progress of the COPY command as well.
                         */
-                       pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+                       pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+                                                                                ++processed);
                }
        }
 
@@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate,
        ExprState **defexprs;
        MemoryContext oldcontext;
        bool            volatile_defexprs;
+       const int       progress_cols[] = {
+               PROGRESS_COPY_COMMAND,
+               PROGRESS_COPY_TYPE,
+               PROGRESS_COPY_BYTES_TOTAL
+       };
+       int64           progress_vals[] = {
+               PROGRESS_COPY_COMMAND_FROM,
+               0,
+               0
+       };
 
        /* Allocate workspace and zero all fields */
        cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
@@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate,
 
        if (data_source_cb)
        {
+               progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
                cstate->copy_src = COPY_CALLBACK;
                cstate->data_source_cb = data_source_cb;
        }
        else if (pipe)
        {
+               progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
                Assert(!is_program);    /* the grammar does not allow this */
                if (whereToSendOutput == DestRemote)
                        ReceiveCopyBegin(cstate);
@@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate,
 
                if (cstate->is_program)
                {
+                       progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
                        cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
                        if (cstate->copy_file == NULL)
                                ereport(ERROR,
@@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate,
                {
                        struct stat st;
 
+                       progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
                        cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
                        if (cstate->copy_file == NULL)
                        {
@@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate,
                                                (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                                 errmsg("\"%s\" is a directory", cstate->filename)));
 
-                       pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
+                       progress_vals[2] = st.st_size;
                }
        }
 
+       pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
+
        if (cstate->opts.binary)
        {
                /* Read and verify binary header */
index 46155015cfd86b8420e87d9b7bb98411997e749e..7257a54e935010e0fce6008bba3ff849e08e0f8a 100644 (file)
@@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate,
        TupleDesc       tupDesc;
        int                     num_phys_attrs;
        MemoryContext oldcontext;
+       const int       progress_cols[] = {
+               PROGRESS_COPY_COMMAND,
+               PROGRESS_COPY_TYPE
+       };
+       int64           progress_vals[] = {
+               PROGRESS_COPY_COMMAND_TO,
+               0
+       };
 
        if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
        {
@@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate,
 
        if (pipe)
        {
+               progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
+
                Assert(!is_program);    /* the grammar does not allow this */
                if (whereToSendOutput != DestRemote)
                        cstate->copy_file = stdout;
@@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate,
 
                if (is_program)
                {
+                       progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
                        cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
                        if (cstate->copy_file == NULL)
                                ereport(ERROR,
@@ -682,6 +693,8 @@ BeginCopyTo(ParseState *pstate,
                        mode_t          oumask; /* Pre-existing umask value */
                        struct stat st;
 
+                       progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
+
                        /*
                         * Prevent write to relative path ... too easy to shoot oneself in
                         * the foot by overwriting a database file ...
@@ -731,6 +744,8 @@ BeginCopyTo(ParseState *pstate,
        /* initialize progress */
        pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
                                                                  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+       pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
+
        cstate->bytes_processed = 0;
 
        MemoryContextSwitchTo(oldcontext);
@@ -881,8 +896,12 @@ DoCopyTo(CopyToState cstate)
                        /* Format and send the data */
                        CopyOneRowTo(cstate, slot);
 
-                       /* Increment amount of processed tuples and update the progress */
-                       pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+                       /*
+                        * Increment the number of processed tuples, and report the
+                        * progress.
+                        */
+                       pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+                                                                                ++processed);
                }
 
                ExecDropSingleTupleTableSlot(slot);
@@ -1251,8 +1270,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
        /* Send the data */
        CopyOneRowTo(cstate, slot);
 
-       /* Increment amount of processed tuples and update the progress */
-       pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
+       /* Increment the number of processed tuples, and report the progress */
+       pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
+                                                                ++myState->processed);
 
        return true;
 }
index 81fd68348d54d8b96ccb678730da876c2e43824f..b25a2b5a65268746400842d84253f731a5017255 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202103091
+#define CATALOG_VERSION_NO     202103092
 
 #endif
index 95ec5d02e9cc47df7295dc6117db350604b8eeec..c6b139d57d3dde25ec47606131c57b86dde4a455 100644 (file)
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE             4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL                 5
 
-/* Commands of PROGRESS_COPY */
+/* Progress parameters for PROGRESS_COPY */
 #define PROGRESS_COPY_BYTES_PROCESSED 0
 #define PROGRESS_COPY_BYTES_TOTAL 1
-#define PROGRESS_COPY_LINES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_EXCLUDED 3
+#define PROGRESS_COPY_COMMAND 4
+#define PROGRESS_COPY_TYPE 5
+
+/* Commands of COPY (as advertised via PROGRESS_COPY_COMMAND) */
+#define PROGRESS_COPY_COMMAND_FROM 1
+#define PROGRESS_COPY_COMMAND_TO 2
+
+/* Types of COPY commands (as advertised via PROGRESS_COPY_TYPE) */
+#define PROGRESS_COPY_TYPE_FILE 1
+#define PROGRESS_COPY_TYPE_PROGRAM 2
+#define PROGRESS_COPY_TYPE_PIPE 3
+#define PROGRESS_COPY_TYPE_CALLBACK 4
 
 #endif
index dd5cc9c2213a1b68b2b7865dd42343b37864a0cb..c2f8328d1897f746555d45ab7c428fb44b0004bf 100644 (file)
@@ -1950,9 +1950,22 @@ pg_stat_progress_copy| SELECT s.pid,
     s.datid,
     d.datname,
     s.relid,
+        CASE s.param5
+            WHEN 1 THEN 'COPY FROM'::text
+            WHEN 2 THEN 'COPY TO'::text
+            ELSE NULL::text
+        END AS command,
+        CASE s.param6
+            WHEN 1 THEN 'FILE'::text
+            WHEN 2 THEN 'PROGRAM'::text
+            WHEN 3 THEN 'PIPE'::text
+            WHEN 4 THEN 'CALLBACK'::text
+            ELSE NULL::text
+        END AS type,
     s.param1 AS bytes_processed,
     s.param2 AS bytes_total,
-    s.param3 AS lines_processed
+    s.param3 AS tuples_processed,
+    s.param4 AS tuples_excluded
    FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,