]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Refactor COPY FROM to use format callback functions.
authorMasahiko Sawada <msawada@postgresql.org>
Fri, 28 Feb 2025 18:29:36 +0000 (10:29 -0800)
committerMasahiko Sawada <msawada@postgresql.org>
Fri, 28 Feb 2025 18:29:36 +0000 (10:29 -0800)
This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.

This change is a preliminary step towards making the COPY FROM command
extensible in terms of input formats.

Similar to 2e4127b6d2d, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.

Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com

src/backend/commands/copyfrom.c
src/backend/commands/copyfromparse.c
src/include/commands/copy.h
src/include/commands/copyapi.h
src/include/commands/copyfrom_internal.h
src/tools/pgindent/typedefs.list

index 8875d79d59ad78bb4b221e86bad9aeaa7dfbefb6..198cee2bc48b980742eb328b4ea6123a7bbd8959 100644 (file)
@@ -28,7 +28,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/namespace.h"
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "commands/trigger.h"
@@ -106,6 +106,145 @@ typedef struct CopyMultiInsertInfo
 /* non-export function prototypes */
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+/*
+ * Built-in format-specific routines. One-row callbacks are defined in
+ * copyfromparse.c.
+ */
+static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+                                                                  Oid *typioparam);
+static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromTextLikeEnd(CopyFromState cstate);
+static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                                                                FmgrInfo *finfo, Oid *typioparam);
+static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+static void CopyFromBinaryEnd(CopyFromState cstate);
+
+
+/*
+ * COPY FROM routines for built-in formats.
+ *
+ * CSV and text formats share the same TextLike routines except for the
+ * one-row callback.
+ */
+
+/* text format */
+static const CopyFromRoutine CopyFromRoutineText = {
+       .CopyFromInFunc = CopyFromTextLikeInFunc,
+       .CopyFromStart = CopyFromTextLikeStart,
+       .CopyFromOneRow = CopyFromTextOneRow,
+       .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* CSV format */
+static const CopyFromRoutine CopyFromRoutineCSV = {
+       .CopyFromInFunc = CopyFromTextLikeInFunc,
+       .CopyFromStart = CopyFromTextLikeStart,
+       .CopyFromOneRow = CopyFromCSVOneRow,
+       .CopyFromEnd = CopyFromTextLikeEnd,
+};
+
+/* binary format */
+static const CopyFromRoutine CopyFromRoutineBinary = {
+       .CopyFromInFunc = CopyFromBinaryInFunc,
+       .CopyFromStart = CopyFromBinaryStart,
+       .CopyFromOneRow = CopyFromBinaryOneRow,
+       .CopyFromEnd = CopyFromBinaryEnd,
+};
+
+/* Return a COPY FROM routine for the given options */
+static const CopyFromRoutine *
+CopyFromGetRoutine(CopyFormatOptions opts)
+{
+       if (opts.csv_mode)
+               return &CopyFromRoutineCSV;
+       else if (opts.binary)
+               return &CopyFromRoutineBinary;
+
+       /* default is text */
+       return &CopyFromRoutineText;
+}
+
+/* Implementation of the start callback for text and CSV formats */
+static void
+CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+       AttrNumber      attr_count;
+
+       /*
+        * If encoding conversion is needed, we need another buffer to hold the
+        * converted input data.  Otherwise, we can just point input_buf to the
+        * same buffer as raw_buf.
+        */
+       if (cstate->need_transcoding)
+       {
+               cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+               cstate->input_buf_index = cstate->input_buf_len = 0;
+       }
+       else
+               cstate->input_buf = cstate->raw_buf;
+       cstate->input_reached_eof = false;
+
+       initStringInfo(&cstate->line_buf);
+
+       /*
+        * Create workspace for CopyReadAttributes results; used by CSV and text
+        * format.
+        */
+       attr_count = list_length(cstate->attnumlist);
+       cstate->max_fields = attr_count;
+       cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+}
+
+/*
+ * Implementation of the infunc callback for text and CSV formats. Assign
+ * the input function data to the given *finfo.
+ */
+static void
+CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo,
+                                          Oid *typioparam)
+{
+       Oid                     func_oid;
+
+       getTypeInputInfo(atttypid, &func_oid, typioparam);
+       fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for text and CSV formats */
+static void
+CopyFromTextLikeEnd(CopyFromState cstate)
+{
+       /* nothing to do */
+}
+
+/* Implementation of the start callback for binary format */
+static void
+CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+       /* Read and verify binary header */
+       ReceiveCopyBinaryHeader(cstate);
+}
+
+/*
+ * Implementation of the infunc callback for binary format. Assign
+ * the binary input function to the given *finfo.
+ */
+static void
+CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid,
+                                        FmgrInfo *finfo, Oid *typioparam)
+{
+       Oid                     func_oid;
+
+       getTypeBinaryInputInfo(atttypid, &func_oid, typioparam);
+       fmgr_info(func_oid, finfo);
+}
+
+/* Implementation of the end callback for binary format */
+static void
+CopyFromBinaryEnd(CopyFromState cstate)
+{
+       /* nothing to do */
+}
+
 /*
  * error context callback for COPY FROM
  *
@@ -1403,7 +1542,6 @@ BeginCopyFrom(ParseState *pstate,
                                num_defaults;
        FmgrInfo   *in_functions;
        Oid                *typioparams;
-       Oid                     in_func_oid;
        int                *defmap;
        ExprState **defexprs;
        MemoryContext oldcontext;
@@ -1435,6 +1573,9 @@ BeginCopyFrom(ParseState *pstate,
        /* Extract options from the statement node tree */
        ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
 
+       /* Set the format routine */
+       cstate->routine = CopyFromGetRoutine(cstate->opts);
+
        /* Process the target relation */
        cstate->rel = rel;
 
@@ -1590,25 +1731,6 @@ BeginCopyFrom(ParseState *pstate,
        cstate->raw_buf_index = cstate->raw_buf_len = 0;
        cstate->raw_reached_eof = false;
 
-       if (!cstate->opts.binary)
-       {
-               /*
-                * If encoding conversion is needed, we need another buffer to hold
-                * the converted input data.  Otherwise, we can just point input_buf
-                * to the same buffer as raw_buf.
-                */
-               if (cstate->need_transcoding)
-               {
-                       cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-                       cstate->input_buf_index = cstate->input_buf_len = 0;
-               }
-               else
-                       cstate->input_buf = cstate->raw_buf;
-               cstate->input_reached_eof = false;
-
-               initStringInfo(&cstate->line_buf);
-       }
-
        initStringInfo(&cstate->attribute_buf);
 
        /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1641,13 +1763,9 @@ BeginCopyFrom(ParseState *pstate,
                        continue;
 
                /* Fetch the input function and typioparam info */
-               if (cstate->opts.binary)
-                       getTypeBinaryInputInfo(att->atttypid,
-                                                                  &in_func_oid, &typioparams[attnum - 1]);
-               else
-                       getTypeInputInfo(att->atttypid,
-                                                        &in_func_oid, &typioparams[attnum - 1]);
-               fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+               cstate->routine->CopyFromInFunc(cstate, att->atttypid,
+                                                                               &in_functions[attnum - 1],
+                                                                               &typioparams[attnum - 1]);
 
                /* Get default info if available */
                defexprs[attnum - 1] = NULL;
@@ -1782,20 +1900,7 @@ BeginCopyFrom(ParseState *pstate,
 
        pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
 
-       if (cstate->opts.binary)
-       {
-               /* Read and verify binary header */
-               ReceiveCopyBinaryHeader(cstate);
-       }
-
-       /* create workspace for CopyReadAttributes results */
-       if (!cstate->opts.binary)
-       {
-               AttrNumber      attr_count = list_length(cstate->attnumlist);
-
-               cstate->max_fields = attr_count;
-               cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-       }
+       cstate->routine->CopyFromStart(cstate, tupDesc);
 
        MemoryContextSwitchTo(oldcontext);
 
@@ -1808,6 +1913,9 @@ BeginCopyFrom(ParseState *pstate,
 void
 EndCopyFrom(CopyFromState cstate)
 {
+       /* Invoke the end callback */
+       cstate->routine->CopyFromEnd(cstate);
+
        /* No COPY FROM related resources except memory. */
        if (cstate->is_program)
        {
index caccdc8563c042dec92401ccdec70ae1759dca3d..bad577aa67b099735d0d2eecbf62bf8032509f23 100644 (file)
@@ -62,7 +62,7 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
-#include "commands/copy.h"
+#include "commands/copyapi.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
@@ -140,13 +140,18 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static bool CopyReadLine(CopyFromState cstate);
-static bool CopyReadLineText(CopyFromState cstate);
+static bool CopyReadLine(CopyFromState cstate, bool is_csv);
+static bool CopyReadLineText(CopyFromState cstate, bool is_csv);
 static int     CopyReadAttributesText(CopyFromState cstate);
 static int     CopyReadAttributesCSV(CopyFromState cstate);
 static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo,
                                                                         Oid typioparam, int32 typmod,
                                                                         bool *isnull);
+static pg_attribute_always_inline bool CopyFromTextLikeOneRow(CopyFromState cstate,
+                                                                                                                         ExprContext *econtext,
+                                                                                                                         Datum *values,
+                                                                                                                         bool *nulls,
+                                                                                                                         bool is_csv);
 
 
 /* Low-level communications functions */
@@ -740,9 +745,12 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes)
  * in the relation.
  *
  * NOTE: force_not_null option are not applied to the returned fields.
+ *
+ * We use pg_attribute_always_inline to reduce function call overhead
+ * and to help compilers to optimize away the 'is_csv' condition.
  */
-bool
-NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
+static pg_attribute_always_inline bool
+NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv)
 {
        int                     fldct;
        bool            done;
@@ -759,13 +767,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
                tupDesc = RelationGetDescr(cstate->rel);
 
                cstate->cur_lineno++;
-               done = CopyReadLine(cstate);
+               done = CopyReadLine(cstate, is_csv);
 
                if (cstate->opts.header_line == COPY_HEADER_MATCH)
                {
                        int                     fldnum;
 
-                       if (cstate->opts.csv_mode)
+                       if (is_csv)
                                fldct = CopyReadAttributesCSV(cstate);
                        else
                                fldct = CopyReadAttributesText(cstate);
@@ -809,7 +817,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
        cstate->cur_lineno++;
 
        /* Actually read the line into memory here */
-       done = CopyReadLine(cstate);
+       done = CopyReadLine(cstate, is_csv);
 
        /*
         * EOF at start of line means we're done.  If we see EOF after some
@@ -820,7 +828,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
                return false;
 
        /* Parse the line into de-escaped field values */
-       if (cstate->opts.csv_mode)
+       if (is_csv)
                fldct = CopyReadAttributesCSV(cstate);
        else
                fldct = CopyReadAttributesText(cstate);
@@ -847,233 +855,275 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 {
        TupleDesc       tupDesc;
        AttrNumber      num_phys_attrs,
-                               attr_count,
                                num_defaults = cstate->num_defaults;
-       FmgrInfo   *in_functions = cstate->in_functions;
-       Oid                *typioparams = cstate->typioparams;
        int                     i;
        int                *defmap = cstate->defmap;
        ExprState **defexprs = cstate->defexprs;
 
        tupDesc = RelationGetDescr(cstate->rel);
        num_phys_attrs = tupDesc->natts;
-       attr_count = list_length(cstate->attnumlist);
 
        /* Initialize all values for row to NULL */
        MemSet(values, 0, num_phys_attrs * sizeof(Datum));
        MemSet(nulls, true, num_phys_attrs * sizeof(bool));
        MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
 
-       if (!cstate->opts.binary)
+       /* Get one row from source */
+       if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls))
+               return false;
+
+       /*
+        * Now compute and insert any defaults available for the columns not
+        * provided by the input data.  Anything not processed here or above will
+        * remain NULL.
+        */
+       for (i = 0; i < num_defaults; i++)
        {
-               char      **field_strings;
-               ListCell   *cur;
-               int                     fldct;
-               int                     fieldno;
-               char       *string;
+               /*
+                * The caller must supply econtext and have switched into the
+                * per-tuple memory context in it.
+                */
+               Assert(econtext != NULL);
+               Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-               /* read raw fields in the next line */
-               if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-                       return false;
+               values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
+                                                                                &nulls[defmap[i]]);
+       }
+
+       return true;
+}
+
+/* Implementation of the per-row callback for text format */
+bool
+CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                                  bool *nulls)
+{
+       return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false);
+}
+
+/* Implementation of the per-row callback for CSV format */
+bool
+CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                                 bool *nulls)
+{
+       return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true);
+}
 
-               /* check for overflowing fields */
-               if (attr_count > 0 && fldct > attr_count)
+/*
+ * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow().
+ *
+ * We use pg_attribute_always_inline to reduce function call overhead
+ * and to help compilers to optimize away the 'is_csv' condition.
+ */
+static pg_attribute_always_inline bool
+CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext,
+                                          Datum *values, bool *nulls, bool is_csv)
+{
+       TupleDesc       tupDesc;
+       AttrNumber      attr_count;
+       FmgrInfo   *in_functions = cstate->in_functions;
+       Oid                *typioparams = cstate->typioparams;
+       ExprState **defexprs = cstate->defexprs;
+       char      **field_strings;
+       ListCell   *cur;
+       int                     fldct;
+       int                     fieldno;
+       char       *string;
+
+       tupDesc = RelationGetDescr(cstate->rel);
+       attr_count = list_length(cstate->attnumlist);
+
+       /* read raw fields in the next line */
+       if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv))
+               return false;
+
+       /* check for overflowing fields */
+       if (attr_count > 0 && fldct > attr_count)
+               ereport(ERROR,
+                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                errmsg("extra data after last expected column")));
+
+       fieldno = 0;
+
+       /* Loop to read the user attributes on the line. */
+       foreach(cur, cstate->attnumlist)
+       {
+               int                     attnum = lfirst_int(cur);
+               int                     m = attnum - 1;
+               Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+               if (fieldno >= fldct)
                        ereport(ERROR,
                                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                        errmsg("extra data after last expected column")));
-
-               fieldno = 0;
+                                        errmsg("missing data for column \"%s\"",
+                                                       NameStr(att->attname))));
+               string = field_strings[fieldno++];
 
-               /* Loop to read the user attributes on the line. */
-               foreach(cur, cstate->attnumlist)
+               if (cstate->convert_select_flags &&
+                       !cstate->convert_select_flags[m])
                {
-                       int                     attnum = lfirst_int(cur);
-                       int                     m = attnum - 1;
-                       Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-                       if (fieldno >= fldct)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                errmsg("missing data for column \"%s\"",
-                                                               NameStr(att->attname))));
-                       string = field_strings[fieldno++];
+                       /* ignore input field, leaving column as NULL */
+                       continue;
+               }
 
-                       if (cstate->convert_select_flags &&
-                               !cstate->convert_select_flags[m])
+               if (is_csv)
+               {
+                       if (string == NULL &&
+                               cstate->opts.force_notnull_flags[m])
                        {
-                               /* ignore input field, leaving column as NULL */
-                               continue;
+                               /*
+                                * FORCE_NOT_NULL option is set and column is NULL - convert
+                                * it to the NULL string.
+                                */
+                               string = cstate->opts.null_print;
                        }
-
-                       if (cstate->opts.csv_mode)
+                       else if (string != NULL && cstate->opts.force_null_flags[m]
+                                        && strcmp(string, cstate->opts.null_print) == 0)
                        {
-                               if (string == NULL &&
-                                       cstate->opts.force_notnull_flags[m])
-                               {
-                                       /*
-                                        * FORCE_NOT_NULL option is set and column is NULL -
-                                        * convert it to the NULL string.
-                                        */
-                                       string = cstate->opts.null_print;
-                               }
-                               else if (string != NULL && cstate->opts.force_null_flags[m]
-                                                && strcmp(string, cstate->opts.null_print) == 0)
-                               {
-                                       /*
-                                        * FORCE_NULL option is set and column matches the NULL
-                                        * string. It must have been quoted, or otherwise the
-                                        * string would already have been set to NULL. Convert it
-                                        * to NULL as specified.
-                                        */
-                                       string = NULL;
-                               }
+                               /*
+                                * FORCE_NULL option is set and column matches the NULL
+                                * string. It must have been quoted, or otherwise the string
+                                * would already have been set to NULL. Convert it to NULL as
+                                * specified.
+                                */
+                               string = NULL;
                        }
+               }
 
-                       cstate->cur_attname = NameStr(att->attname);
-                       cstate->cur_attval = string;
+               cstate->cur_attname = NameStr(att->attname);
+               cstate->cur_attval = string;
 
-                       if (string != NULL)
-                               nulls[m] = false;
+               if (string != NULL)
+                       nulls[m] = false;
 
-                       if (cstate->defaults[m])
-                       {
-                               /*
-                                * The caller must supply econtext and have switched into the
-                                * per-tuple memory context in it.
-                                */
-                               Assert(econtext != NULL);
-                               Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+               if (cstate->defaults[m])
+               {
+                       /* We must have switched into the per-tuple memory context */
+                       Assert(econtext != NULL);
+                       Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-                               values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
-                       }
+                       values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+               }
 
-                       /*
-                        * If ON_ERROR is specified with IGNORE, skip rows with soft
-                        * errors
-                        */
-                       else if (!InputFunctionCallSafe(&in_functions[m],
-                                                                                       string,
-                                                                                       typioparams[m],
-                                                                                       att->atttypmod,
-                                                                                       (Node *) cstate->escontext,
-                                                                                       &values[m]))
-                       {
-                               Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
+               /*
+                * If ON_ERROR is specified with IGNORE, skip rows with soft errors
+                */
+               else if (!InputFunctionCallSafe(&in_functions[m],
+                                                                               string,
+                                                                               typioparams[m],
+                                                                               att->atttypmod,
+                                                                               (Node *) cstate->escontext,
+                                                                               &values[m]))
+               {
+                       Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP);
 
-                               cstate->num_errors++;
+                       cstate->num_errors++;
 
-                               if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
-                               {
-                                       /*
-                                        * Since we emit line number and column info in the below
-                                        * notice message, we suppress error context information
-                                        * other than the relation name.
-                                        */
-                                       Assert(!cstate->relname_only);
-                                       cstate->relname_only = true;
+                       if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE)
+                       {
+                               /*
+                                * Since we emit line number and column info in the below
+                                * notice message, we suppress error context information other
+                                * than the relation name.
+                                */
+                               Assert(!cstate->relname_only);
+                               cstate->relname_only = true;
 
-                                       if (cstate->cur_attval)
-                                       {
-                                               char       *attval;
-
-                                               attval = CopyLimitPrintoutLength(cstate->cur_attval);
-                                               ereport(NOTICE,
-                                                               errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
-                                                                          (unsigned long long) cstate->cur_lineno,
-                                                                          cstate->cur_attname,
-                                                                          attval));
-                                               pfree(attval);
-                                       }
-                                       else
-                                               ereport(NOTICE,
-                                                               errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
-                                                                          (unsigned long long) cstate->cur_lineno,
-                                                                          cstate->cur_attname));
-
-                                       /* reset relname_only */
-                                       cstate->relname_only = false;
+                               if (cstate->cur_attval)
+                               {
+                                       char       *attval;
+
+                                       attval = CopyLimitPrintoutLength(cstate->cur_attval);
+                                       ereport(NOTICE,
+                                                       errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"",
+                                                                  (unsigned long long) cstate->cur_lineno,
+                                                                  cstate->cur_attname,
+                                                                  attval));
+                                       pfree(attval);
                                }
+                               else
+                                       ereport(NOTICE,
+                                                       errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input",
+                                                                  (unsigned long long) cstate->cur_lineno,
+                                                                  cstate->cur_attname));
 
-                               return true;
+                               /* reset relname_only */
+                               cstate->relname_only = false;
                        }
 
-                       cstate->cur_attname = NULL;
-                       cstate->cur_attval = NULL;
+                       return true;
                }
 
-               Assert(fieldno == attr_count);
+               cstate->cur_attname = NULL;
+               cstate->cur_attval = NULL;
        }
-       else
-       {
-               /* binary */
-               int16           fld_count;
-               ListCell   *cur;
 
-               cstate->cur_lineno++;
+       Assert(fieldno == attr_count);
 
-               if (!CopyGetInt16(cstate, &fld_count))
-               {
-                       /* EOF detected (end of file, or protocol-level EOF) */
-                       return false;
-               }
+       return true;
+}
 
-               if (fld_count == -1)
-               {
-                       /*
-                        * Received EOF marker.  Wait for the protocol-level EOF, and
-                        * complain if it doesn't come immediately.  In COPY FROM STDIN,
-                        * this ensures that we correctly handle CopyFail, if client
-                        * chooses to send that now.  When copying from file, we could
-                        * ignore the rest of the file like in text mode, but we choose to
-                        * be consistent with the COPY FROM STDIN case.
-                        */
-                       char            dummy;
+/* Implementation of the per-row callback for binary format */
+bool
+CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values,
+                                        bool *nulls)
+{
+       TupleDesc       tupDesc;
+       AttrNumber      attr_count;
+       FmgrInfo   *in_functions = cstate->in_functions;
+       Oid                *typioparams = cstate->typioparams;
+       int16           fld_count;
+       ListCell   *cur;
 
-                       if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                errmsg("received copy data after EOF marker")));
-                       return false;
-               }
+       tupDesc = RelationGetDescr(cstate->rel);
+       attr_count = list_length(cstate->attnumlist);
 
-               if (fld_count != attr_count)
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                        errmsg("row field count is %d, expected %d",
-                                                       (int) fld_count, attr_count)));
+       cstate->cur_lineno++;
 
-               foreach(cur, cstate->attnumlist)
-               {
-                       int                     attnum = lfirst_int(cur);
-                       int                     m = attnum - 1;
-                       Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-                       cstate->cur_attname = NameStr(att->attname);
-                       values[m] = CopyReadBinaryAttribute(cstate,
-                                                                                               &in_functions[m],
-                                                                                               typioparams[m],
-                                                                                               att->atttypmod,
-                                                                                               &nulls[m]);
-                       cstate->cur_attname = NULL;
-               }
+       if (!CopyGetInt16(cstate, &fld_count))
+       {
+               /* EOF detected (end of file, or protocol-level EOF) */
+               return false;
        }
 
-       /*
-        * Now compute and insert any defaults available for the columns not
-        * provided by the input data.  Anything not processed here or above will
-        * remain NULL.
-        */
-       for (i = 0; i < num_defaults; i++)
+       if (fld_count == -1)
        {
                /*
-                * The caller must supply econtext and have switched into the
-                * per-tuple memory context in it.
+                * Received EOF marker.  Wait for the protocol-level EOF, and complain
+                * if it doesn't come immediately.  In COPY FROM STDIN, this ensures
+                * that we correctly handle CopyFail, if client chooses to send that
+                * now.  When copying from file, we could ignore the rest of the file
+                * like in text mode, but we choose to be consistent with the COPY
+                * FROM STDIN case.
                 */
-               Assert(econtext != NULL);
-               Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+               char            dummy;
 
-               values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext,
-                                                                                &nulls[defmap[i]]);
+               if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                        errmsg("received copy data after EOF marker")));
+               return false;
+       }
+
+       if (fld_count != attr_count)
+               ereport(ERROR,
+                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                errmsg("row field count is %d, expected %d",
+                                               (int) fld_count, attr_count)));
+
+       foreach(cur, cstate->attnumlist)
+       {
+               int                     attnum = lfirst_int(cur);
+               int                     m = attnum - 1;
+               Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+               cstate->cur_attname = NameStr(att->attname);
+               values[m] = CopyReadBinaryAttribute(cstate,
+                                                                                       &in_functions[m],
+                                                                                       typioparams[m],
+                                                                                       att->atttypmod,
+                                                                                       &nulls[m]);
+               cstate->cur_attname = NULL;
        }
 
        return true;
@@ -1087,7 +1137,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
  * in the final value of line_buf.
  */
 static bool
-CopyReadLine(CopyFromState cstate)
+CopyReadLine(CopyFromState cstate, bool is_csv)
 {
        bool            result;
 
@@ -1095,7 +1145,7 @@ CopyReadLine(CopyFromState cstate)
        cstate->line_buf_valid = false;
 
        /* Parse data and transfer into line_buf */
-       result = CopyReadLineText(cstate);
+       result = CopyReadLineText(cstate, is_csv);
 
        if (result)
        {
@@ -1163,7 +1213,7 @@ CopyReadLine(CopyFromState cstate)
  * CopyReadLineText - inner loop of CopyReadLine for text mode
  */
 static bool
-CopyReadLineText(CopyFromState cstate)
+CopyReadLineText(CopyFromState cstate, bool is_csv)
 {
        char       *copy_input_buf;
        int                     input_buf_ptr;
@@ -1178,7 +1228,7 @@ CopyReadLineText(CopyFromState cstate)
        char            quotec = '\0';
        char            escapec = '\0';
 
-       if (cstate->opts.csv_mode)
+       if (is_csv)
        {
                quotec = cstate->opts.quote[0];
                escapec = cstate->opts.escape[0];
@@ -1255,7 +1305,7 @@ CopyReadLineText(CopyFromState cstate)
                prev_raw_ptr = input_buf_ptr;
                c = copy_input_buf[input_buf_ptr++];
 
-               if (cstate->opts.csv_mode)
+               if (is_csv)
                {
                        /*
                         * If character is '\r', we may need to look ahead below.  Force
@@ -1294,7 +1344,7 @@ CopyReadLineText(CopyFromState cstate)
                }
 
                /* Process \r */
-               if (c == '\r' && (!cstate->opts.csv_mode || !in_quote))
+               if (c == '\r' && (!is_csv || !in_quote))
                {
                        /* Check for \r\n on first line, _and_ handle \r\n. */
                        if (cstate->eol_type == EOL_UNKNOWN ||
@@ -1322,10 +1372,10 @@ CopyReadLineText(CopyFromState cstate)
                                        if (cstate->eol_type == EOL_CRNL)
                                                ereport(ERROR,
                                                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                                !cstate->opts.csv_mode ?
+                                                                !is_csv ?
                                                                 errmsg("literal carriage return found in data") :
                                                                 errmsg("unquoted carriage return found in data"),
-                                                                !cstate->opts.csv_mode ?
+                                                                !is_csv ?
                                                                 errhint("Use \"\\r\" to represent carriage return.") :
                                                                 errhint("Use quoted CSV field to represent carriage return.")));
 
@@ -1339,10 +1389,10 @@ CopyReadLineText(CopyFromState cstate)
                        else if (cstate->eol_type == EOL_NL)
                                ereport(ERROR,
                                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                !cstate->opts.csv_mode ?
+                                                !is_csv ?
                                                 errmsg("literal carriage return found in data") :
                                                 errmsg("unquoted carriage return found in data"),
-                                                !cstate->opts.csv_mode ?
+                                                !is_csv ?
                                                 errhint("Use \"\\r\" to represent carriage return.") :
                                                 errhint("Use quoted CSV field to represent carriage return.")));
                        /* If reach here, we have found the line terminator */
@@ -1350,15 +1400,15 @@ CopyReadLineText(CopyFromState cstate)
                }
 
                /* Process \n */
-               if (c == '\n' && (!cstate->opts.csv_mode || !in_quote))
+               if (c == '\n' && (!is_csv || !in_quote))
                {
                        if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
                                ereport(ERROR,
                                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                !cstate->opts.csv_mode ?
+                                                !is_csv ?
                                                 errmsg("literal newline found in data") :
                                                 errmsg("unquoted newline found in data"),
-                                                !cstate->opts.csv_mode ?
+                                                !is_csv ?
                                                 errhint("Use \"\\n\" to represent newline.") :
                                                 errhint("Use quoted CSV field to represent newline.")));
                        cstate->eol_type = EOL_NL;      /* in case not set yet */
@@ -1370,7 +1420,7 @@ CopyReadLineText(CopyFromState cstate)
                 * Process backslash, except in CSV mode where backslash is a normal
                 * character.
                 */
-               if (c == '\\' && !cstate->opts.csv_mode)
+               if (c == '\\' && !is_csv)
                {
                        char            c2;
 
index 06dfdfef7210c7ff3ef0f6e8a4d56710ded2f1ae..7bc044e2816e438e8c7bc8df5e5192670bdf0203 100644 (file)
@@ -107,8 +107,6 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
                                                 Datum *values, bool *nulls);
-extern bool NextCopyFromRawFields(CopyFromState cstate,
-                                                                 char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 extern char *CopyLimitPrintoutLength(const char *str);
 
index bd2d386816e67bc91cf04ddf7112b7252fe6ba0b..2a2d2f9876bafc24f3e33f30f37611a05f1753a8 100644 (file)
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * copyapi.h
- *       API for COPY TO handlers
+ *       API for COPY TO/FROM handlers
  *
  *
  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
@@ -54,4 +54,52 @@ typedef struct CopyToRoutine
        void            (*CopyToEnd) (CopyToState cstate);
 } CopyToRoutine;
 
+/*
+ * API structure for a COPY FROM format implementation. Note this must be
+ * allocated in a server-lifetime manner, typically as a static const struct.
+ */
+typedef struct CopyFromRoutine
+{
+       /*
+        * Set input function information. This callback is called once at the
+        * beginning of COPY FROM.
+        *
+        * 'finfo' can be optionally filled to provide the catalog information of
+        * the input function.
+        *
+        * 'typioparam' can be optionally filled to define the OID of the type to
+        * pass to the input function.'atttypid' is the OID of data type used by
+        * the relation's attribute.
+        */
+       void            (*CopyFromInFunc) (CopyFromState cstate, Oid atttypid,
+                                                                  FmgrInfo *finfo, Oid *typioparam);
+
+       /*
+        * Start a COPY FROM. This callback is called once at the beginning of
+        * COPY FROM.
+        *
+        * 'tupDesc' is the tuple descriptor of the relation where the data needs
+        * to be copied. This can be used for any initialization steps required by
+        * a format.
+        */
+       void            (*CopyFromStart) (CopyFromState cstate, TupleDesc tupDesc);
+
+       /*
+        * Read one row from the source and fill *values and *nulls.
+        *
+        * 'econtext' is used to evaluate default expression for each column that
+        * is either not read from the file or is using the DEFAULT option of COPY
+        * FROM. It is NULL if no default values are used.
+        *
+        * Returns false if there are no more tuples to read.
+        */
+       bool            (*CopyFromOneRow) (CopyFromState cstate, ExprContext *econtext,
+                                                                  Datum *values, bool *nulls);
+
+       /*
+        * End a COPY FROM. This callback is called once at the end of COPY FROM.
+        */
+       void            (*CopyFromEnd) (CopyFromState cstate);
+} CopyFromRoutine;
+
 #endif                                                 /* COPYAPI_H */
index 1d8ac8f62e638d4566232c9718860670380fbee6..c8b22af22d8521c1c9aa923f047741203a757554 100644 (file)
@@ -58,6 +58,9 @@ typedef enum CopyInsertMethod
  */
 typedef struct CopyFromStateData
 {
+       /* format routine */
+       const struct CopyFromRoutine *routine;
+
        /* low-level state data */
        CopySource      copy_src;               /* type of copy source */
        FILE       *copy_file;          /* used if copy_src == COPY_FILE */
@@ -183,4 +186,12 @@ typedef struct CopyFromStateData
 extern void ReceiveCopyBegin(CopyFromState cstate);
 extern void ReceiveCopyBinaryHeader(CopyFromState cstate);
 
+/* One-row callbacks for built-in formats defined in copyfromparse.c */
+extern bool CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext,
+                                                          Datum *values, bool *nulls);
+extern bool CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext,
+                                                         Datum *values, bool *nulls);
+extern bool CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext,
+                                                                Datum *values, bool *nulls);
+
 #endif                                                 /* COPYFROM_INTERNAL_H */
index fcb968e1ffe6d244260ffe37c83035fcafa31d68..56989aa0b8414aba4cb06a702a927a72f708175b 100644 (file)
@@ -501,6 +501,7 @@ ConvertRowtypeExpr
 CookedConstraint
 CopyDest
 CopyFormatOptions
+CopyFromRoutine
 CopyFromState
 CopyFromStateData
 CopyHeaderChoice