]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
pg_basebackup: Refactor code for reading COPY and tar data.
authorRobert Haas <rhaas@postgresql.org>
Thu, 5 Dec 2019 20:14:09 +0000 (15:14 -0500)
committerRobert Haas <rhaas@postgresql.org>
Thu, 5 Dec 2019 20:14:09 +0000 (15:14 -0500)
Add a new function ReceiveCopyData that does just that, taking a
callback as an argument to specify what should be done with each chunk
as it is received. This allows a single copy of the logic to be shared
between ReceiveTarFile and ReceiveAndUnpackTarFile, and eliminates
a few #ifdef conditions based on HAVE_LIBZ.

While this is slightly more code, it's arguably clearer, and
there is a pending patch that introduces additional calls to
ReceiveCopyData.

This commit is not intended to result in any functional change.

Discussion: http://postgr.es/m/CA+TgmoYZDTHbSpwZtW=JDgAhwVAYvmdSrRUjOd+AYdfNNXVBDg@mail.gmail.com

src/bin/pg_basebackup/pg_basebackup.c

index a9d162a7da2dc47627a1e70b47be437829349846..16886fbe71e5c2c27f7cb51a135577ae881b8cb9 100644 (file)
@@ -57,6 +57,40 @@ typedef struct TablespaceList
        TablespaceListCell *tail;
 } TablespaceList;
 
+typedef struct WriteTarState
+{
+       int                     tablespacenum;
+       char            filename[MAXPGPATH];
+       FILE       *tarfile;
+       char            tarhdr[512];
+       bool            basetablespace;
+       bool            in_tarhdr;
+       bool            skip_file;
+       bool            is_recovery_guc_supported;
+       bool            is_postgresql_auto_conf;
+       bool            found_postgresql_auto_conf;
+       int                     file_padding_len;
+       size_t          tarhdrsz;
+       pgoff_t         filesz;
+#ifdef HAVE_LIBZ
+       gzFile          ztarfile;
+#endif
+} WriteTarState;
+
+typedef struct UnpackTarState
+{
+       int                     tablespacenum;
+       char            current_path[MAXPGPATH];
+       char            filename[MAXPGPATH];
+       const char *mapped_tblspc_path;
+       pgoff_t         current_len_left;
+       int                     current_padding;
+       FILE       *file;
+} UnpackTarState;
+
+typedef void (*WriteDataCallback) (size_t nbytes, char *buf,
+                                                                  void *callback_data);
+
 /*
  * pg_xlog has been renamed to pg_wal in version 10.  This version number
  * should be compared with PQserverVersion().
@@ -142,7 +176,10 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo
 static void progress_report(int tablespacenum, const char *filename, bool force);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
+                                                                                void *callback_data);
 static void BaseBackup(void);
 
 static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
@@ -873,43 +910,79 @@ parse_max_rate(char *src)
        return (int32) result;
 }
 
+/*
+ * Read a stream of COPY data and invoke the provided callback for each
+ * chunk.
+ */
+static void
+ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
+                               void *callback_data)
+{
+       PGresult   *res;
+
+       /* Get the COPY data stream. */
+       res = PQgetResult(conn);
+       if (PQresultStatus(res) != PGRES_COPY_OUT)
+       {
+               pg_log_error("could not get COPY data stream: %s",
+                                        PQerrorMessage(conn));
+               exit(1);
+       }
+       PQclear(res);
+
+       /* Loop over chunks until done. */
+       while (1)
+       {
+               int                     r;
+               char       *copybuf;
+
+               r = PQgetCopyData(conn, &copybuf, 0);
+               if (r == -1)
+               {
+                       /* End of chunk. */
+                       break;
+               }
+               else if (r == -2)
+               {
+                       pg_log_error("could not read COPY data: %s",
+                                                PQerrorMessage(conn));
+                       exit(1);
+               }
+
+               (*callback) (r, copybuf, callback_data);
+
+               PQfreemem(copybuf);
+       }
+}
+
 /*
  * Write a piece of tar data
  */
 static void
-writeTarData(
-#ifdef HAVE_LIBZ
-                        gzFile ztarfile,
-#endif
-                        FILE *tarfile, char *buf, int r, char *current_file)
+writeTarData(WriteTarState *state, char *buf, int r)
 {
 #ifdef HAVE_LIBZ
-       if (ztarfile != NULL)
+       if (state->ztarfile != NULL)
        {
-               if (gzwrite(ztarfile, buf, r) != r)
+               if (gzwrite(state->ztarfile, buf, r) != r)
                {
                        pg_log_error("could not write to compressed file \"%s\": %s",
-                                                current_file, get_gz_error(ztarfile));
+                                                state->filename, get_gz_error(state->ztarfile));
                        exit(1);
                }
        }
        else
 #endif
        {
-               if (fwrite(buf, r, 1, tarfile) != 1)
+               if (fwrite(buf, r, 1, state->tarfile) != 1)
                {
-                       pg_log_error("could not write to file \"%s\": %m", current_file);
+                       pg_log_error("could not write to file \"%s\": %m",
+                                                state->filename);
                        exit(1);
                }
        }
 }
 
-#ifdef HAVE_LIBZ
-#define WRITE_TAR_DATA(buf, sz) writeTarData(ztarfile, tarfile, buf, sz, filename)
-#else
-#define WRITE_TAR_DATA(buf, sz) writeTarData(tarfile, buf, sz, filename)
-#endif
-
 /*
  * Receive a tar format file from the connection to the server, and write
  * the data from this file directly into a tar file. If compression is
@@ -923,29 +996,19 @@ writeTarData(
 static void
 ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 {
-       char            filename[MAXPGPATH];
-       char       *copybuf = NULL;
-       FILE       *tarfile = NULL;
-       char            tarhdr[512];
-       bool            basetablespace = PQgetisnull(res, rownum, 0);
-       bool            in_tarhdr = true;
-       bool            skip_file = false;
-       bool            is_recovery_guc_supported = true;
-       bool            is_postgresql_auto_conf = false;
-       bool            found_postgresql_auto_conf = false;
-       int                     file_padding_len = 0;
-       size_t          tarhdrsz = 0;
-       pgoff_t         filesz = 0;
+       char            zerobuf[1024];
+       WriteTarState state;
 
-#ifdef HAVE_LIBZ
-       gzFile          ztarfile = NULL;
-#endif
+       memset(&state, 0, sizeof(state));
+       state.tablespacenum = rownum;
+       state.basetablespace = PQgetisnull(res, rownum, 0);
+       state.in_tarhdr = true;
 
        /* recovery.conf is integrated into postgresql.conf in 12 and newer */
-       if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_RECOVERY_GUC)
-               is_recovery_guc_supported = false;
+       if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC)
+               state.is_recovery_guc_supported = true;
 
-       if (basetablespace)
+       if (state.basetablespace)
        {
                /*
                 * Base tablespaces
@@ -959,40 +1022,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
                        if (compresslevel != 0)
                        {
-                               ztarfile = gzdopen(dup(fileno(stdout)), "wb");
-                               if (gzsetparams(ztarfile, compresslevel,
+                               state.ztarfile = gzdopen(dup(fileno(stdout)), "wb");
+                               if (gzsetparams(state.ztarfile, compresslevel,
                                                                Z_DEFAULT_STRATEGY) != Z_OK)
                                {
                                        pg_log_error("could not set compression level %d: %s",
-                                                                compresslevel, get_gz_error(ztarfile));
+                                                                compresslevel, get_gz_error(state.ztarfile));
                                        exit(1);
                                }
                        }
                        else
 #endif
-                               tarfile = stdout;
-                       strcpy(filename, "-");
+                               state.tarfile = stdout;
+                       strcpy(state.filename, "-");
                }
                else
                {
 #ifdef HAVE_LIBZ
                        if (compresslevel != 0)
                        {
-                               snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
-                               ztarfile = gzopen(filename, "wb");
-                               if (gzsetparams(ztarfile, compresslevel,
+                               snprintf(state.filename, sizeof(state.filename),
+                                                "%s/base.tar.gz", basedir);
+                               state.ztarfile = gzopen(state.filename, "wb");
+                               if (gzsetparams(state.ztarfile, compresslevel,
                                                                Z_DEFAULT_STRATEGY) != Z_OK)
                                {
                                        pg_log_error("could not set compression level %d: %s",
-                                                                compresslevel, get_gz_error(ztarfile));
+                                                                compresslevel, get_gz_error(state.ztarfile));
                                        exit(1);
                                }
                        }
                        else
 #endif
                        {
-                               snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
-                               tarfile = fopen(filename, "wb");
+                               snprintf(state.filename, sizeof(state.filename),
+                                                "%s/base.tar", basedir);
+                               state.tarfile = fopen(state.filename, "wb");
                        }
                }
        }
@@ -1004,34 +1069,35 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
                if (compresslevel != 0)
                {
-                       snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
-                                        PQgetvalue(res, rownum, 0));
-                       ztarfile = gzopen(filename, "wb");
-                       if (gzsetparams(ztarfile, compresslevel,
+                       snprintf(state.filename, sizeof(state.filename),
+                                        "%s/%s.tar.gz",
+                                        basedir, PQgetvalue(res, rownum, 0));
+                       state.ztarfile = gzopen(state.filename, "wb");
+                       if (gzsetparams(state.ztarfile, compresslevel,
                                                        Z_DEFAULT_STRATEGY) != Z_OK)
                        {
                                pg_log_error("could not set compression level %d: %s",
-                                                        compresslevel, get_gz_error(ztarfile));
+                                                        compresslevel, get_gz_error(state.ztarfile));
                                exit(1);
                        }
                }
                else
 #endif
                {
-                       snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
-                                        PQgetvalue(res, rownum, 0));
-                       tarfile = fopen(filename, "wb");
+                       snprintf(state.filename, sizeof(state.filename), "%s/%s.tar",
+                                        basedir, PQgetvalue(res, rownum, 0));
+                       state.tarfile = fopen(state.filename, "wb");
                }
        }
 
 #ifdef HAVE_LIBZ
        if (compresslevel != 0)
        {
-               if (!ztarfile)
+               if (!state.ztarfile)
                {
                        /* Compression is in use */
                        pg_log_error("could not create compressed file \"%s\": %s",
-                                                filename, get_gz_error(ztarfile));
+                                                state.filename, get_gz_error(state.ztarfile));
                        exit(1);
                }
        }
@@ -1039,314 +1105,292 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #endif
        {
                /* Either no zlib support, or zlib support but compresslevel = 0 */
-               if (!tarfile)
+               if (!state.tarfile)
                {
-                       pg_log_error("could not create file \"%s\": %m", filename);
+                       pg_log_error("could not create file \"%s\": %m", state.filename);
                        exit(1);
                }
        }
 
+       ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
+
        /*
-        * Get the COPY data stream
+        * End of copy data. If requested, and this is the base tablespace, write
+        * configuration file into the tarfile. When done, close the file (but not
+        * stdout).
+        *
+        * Also, write two completely empty blocks at the end of the tar file, as
+        * required by some tar programs.
         */
-       res = PQgetResult(conn);
-       if (PQresultStatus(res) != PGRES_COPY_OUT)
-       {
-               pg_log_error("could not get COPY data stream: %s",
-                                        PQerrorMessage(conn));
-               exit(1);
-       }
 
-       while (1)
+       MemSet(zerobuf, 0, sizeof(zerobuf));
+
+       if (state.basetablespace && writerecoveryconf)
        {
-               int                     r;
+               char            header[512];
 
-               if (copybuf != NULL)
+               /*
+                * If postgresql.auto.conf has not been found in the streamed data,
+                * add recovery configuration to postgresql.auto.conf if recovery
+                * parameters are GUCs.  If the instance connected to is older than
+                * 12, create recovery.conf with this data otherwise.
+                */
+               if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported)
                {
-                       PQfreemem(copybuf);
-                       copybuf = NULL;
+                       int                     padding;
+
+                       tarCreateHeader(header,
+                                                       state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
+                                                       NULL,
+                                                       recoveryconfcontents->len,
+                                                       pg_file_create_mode, 04000, 02000,
+                                                       time(NULL));
+
+                       padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;
+
+                       writeTarData(&state, header, sizeof(header));
+                       writeTarData(&state, recoveryconfcontents->data,
+                                                recoveryconfcontents->len);
+                       if (padding)
+                               writeTarData(&state, zerobuf, padding);
                }
 
-               r = PQgetCopyData(conn, &copybuf, 0);
-               if (r == -1)
+               /*
+                * standby.signal is supported only if recovery parameters are GUCs.
+                */
+               if (state.is_recovery_guc_supported)
                {
-                       /*
-                        * End of chunk. If requested, and this is the base tablespace,
-                        * write configuration file into the tarfile. When done, close the
-                        * file (but not stdout).
-                        *
-                        * Also, write two completely empty blocks at the end of the tar
-                        * file, as required by some tar programs.
-                        */
-                       char            zerobuf[1024];
+                       tarCreateHeader(header, "standby.signal", NULL,
+                                                       0,      /* zero-length file */
+                                                       pg_file_create_mode, 04000, 02000,
+                                                       time(NULL));
+
+                       writeTarData(&state, header, sizeof(header));
+                       writeTarData(&state, zerobuf, 511);
+               }
+       }
 
-                       MemSet(zerobuf, 0, sizeof(zerobuf));
+       /* 2 * 512 bytes empty data at end of file */
+       writeTarData(&state, zerobuf, sizeof(zerobuf));
 
-                       if (basetablespace && writerecoveryconf)
+#ifdef HAVE_LIBZ
+       if (state.ztarfile != NULL)
+       {
+               if (gzclose(state.ztarfile) != 0)
+               {
+                       pg_log_error("could not close compressed file \"%s\": %s",
+                                                state.filename, get_gz_error(state.ztarfile));
+                       exit(1);
+               }
+       }
+       else
+#endif
+       {
+               if (strcmp(basedir, "-") != 0)
+               {
+                       if (fclose(state.tarfile) != 0)
                        {
-                               char            header[512];
+                               pg_log_error("could not close file \"%s\": %m",
+                                                        state.filename);
+                               exit(1);
+                       }
+               }
+       }
 
-                               /*
-                                * If postgresql.auto.conf has not been found in the streamed
-                                * data, add recovery configuration to postgresql.auto.conf if
-                                * recovery parameters are GUCs.  If the instance connected to
-                                * is older than 12, create recovery.conf with this data
-                                * otherwise.
-                                */
-                               if (!found_postgresql_auto_conf || !is_recovery_guc_supported)
-                               {
-                                       int                     padding;
+       progress_report(rownum, state.filename, true);
 
-                                       tarCreateHeader(header,
-                                                                       is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
-                                                                       NULL,
-                                                                       recoveryconfcontents->len,
-                                                                       pg_file_create_mode, 04000, 02000,
-                                                                       time(NULL));
+       /*
+        * Do not sync the resulting tar file yet, all files are synced once at
+        * the end.
+        */
+}
 
-                                       padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;
+/*
+ * Receive one chunk of tar-format data from the server.
+ */
+static void
+ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
+{
+       WriteTarState *state = callback_data;
 
-                                       WRITE_TAR_DATA(header, sizeof(header));
-                                       WRITE_TAR_DATA(recoveryconfcontents->data,
-                                                                  recoveryconfcontents->len);
-                                       if (padding)
-                                               WRITE_TAR_DATA(zerobuf, padding);
-                               }
+       if (!writerecoveryconf || !state->basetablespace)
+       {
+               /*
+                * When not writing config file, or when not working on the base
+                * tablespace, we never have to look for an existing configuration
+                * file in the stream.
+                */
+               writeTarData(state, copybuf, r);
+       }
+       else
+       {
+               /*
+                * Look for a config file in the existing tar stream. If it's there,
+                * we must skip it so we can later overwrite it with our own version
+                * of the file.
+                *
+                * To do this, we have to process the individual files inside the TAR
+                * stream. The stream consists of a header and zero or more chunks,
+                * all 512 bytes long. The stream from the server is broken up into
+                * smaller pieces, so we have to track the size of the files to find
+                * the next header structure.
+                */
+               int                     rr = r;
+               int                     pos = 0;
 
+               while (rr > 0)
+               {
+                       if (state->in_tarhdr)
+                       {
                                /*
-                                * standby.signal is supported only if recovery parameters are
-                                * GUCs.
+                                * We're currently reading a header structure inside the TAR
+                                * stream, i.e. the file metadata.
                                 */
-                               if (is_recovery_guc_supported)
+                               if (state->tarhdrsz < 512)
                                {
-                                       tarCreateHeader(header, "standby.signal", NULL,
-                                                                       0,      /* zero-length file */
-                                                                       pg_file_create_mode, 04000, 02000,
-                                                                       time(NULL));
+                                       /*
+                                        * Copy the header structure into tarhdr in case the
+                                        * header is not aligned to 512 bytes or it's not returned
+                                        * in whole by the last PQgetCopyData call.
+                                        */
+                                       int                     hdrleft;
+                                       int                     bytes2copy;
 
-                                       WRITE_TAR_DATA(header, sizeof(header));
-                                       WRITE_TAR_DATA(zerobuf, 511);
-                               }
-                       }
+                                       hdrleft = 512 - state->tarhdrsz;
+                                       bytes2copy = (rr > hdrleft ? hdrleft : rr);
 
-                       /* 2 * 512 bytes empty data at end of file */
-                       WRITE_TAR_DATA(zerobuf, sizeof(zerobuf));
+                                       memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos,
+                                                  bytes2copy);
 
-#ifdef HAVE_LIBZ
-                       if (ztarfile != NULL)
-                       {
-                               if (gzclose(ztarfile) != 0)
-                               {
-                                       pg_log_error("could not close compressed file \"%s\": %s",
-                                                                filename, get_gz_error(ztarfile));
-                                       exit(1);
+                                       rr -= bytes2copy;
+                                       pos += bytes2copy;
+                                       state->tarhdrsz += bytes2copy;
                                }
-                       }
-                       else
-#endif
-                       {
-                               if (strcmp(basedir, "-") != 0)
-                               {
-                                       if (fclose(tarfile) != 0)
-                                       {
-                                               pg_log_error("could not close file \"%s\": %m",
-                                                                        filename);
-                                               exit(1);
-                                       }
-                               }
-                       }
-
-                       break;
-               }
-               else if (r == -2)
-               {
-                       pg_log_error("could not read COPY data: %s",
-                                                PQerrorMessage(conn));
-                       exit(1);
-               }
-
-               if (!writerecoveryconf || !basetablespace)
-               {
-                       /*
-                        * When not writing config file, or when not working on the base
-                        * tablespace, we never have to look for an existing configuration
-                        * file in the stream.
-                        */
-                       WRITE_TAR_DATA(copybuf, r);
-               }
-               else
-               {
-                       /*
-                        * Look for a config file in the existing tar stream. If it's
-                        * there, we must skip it so we can later overwrite it with our
-                        * own version of the file.
-                        *
-                        * To do this, we have to process the individual files inside the
-                        * TAR stream. The stream consists of a header and zero or more
-                        * chunks, all 512 bytes long. The stream from the server is
-                        * broken up into smaller pieces, so we have to track the size of
-                        * the files to find the next header structure.
-                        */
-                       int                     rr = r;
-                       int                     pos = 0;
-
-                       while (rr > 0)
-                       {
-                               if (in_tarhdr)
+                               else
                                {
                                        /*
-                                        * We're currently reading a header structure inside the
-                                        * TAR stream, i.e. the file metadata.
+                                        * We have the complete header structure in tarhdr, look
+                                        * at the file metadata: we may want append recovery info
+                                        * into postgresql.auto.conf and skip standby.signal file
+                                        * if recovery parameters are integrated as GUCs, and
+                                        * recovery.conf otherwise. In both cases we must
+                                        * calculate tar padding.
                                         */
-                                       if (tarhdrsz < 512)
+                                       if (state->is_recovery_guc_supported)
                                        {
-                                               /*
-                                                * Copy the header structure into tarhdr in case the
-                                                * header is not aligned to 512 bytes or it's not
-                                                * returned in whole by the last PQgetCopyData call.
-                                                */
-                                               int                     hdrleft;
-                                               int                     bytes2copy;
-
-                                               hdrleft = 512 - tarhdrsz;
-                                               bytes2copy = (rr > hdrleft ? hdrleft : rr);
-
-                                               memcpy(&tarhdr[tarhdrsz], copybuf + pos, bytes2copy);
-
-                                               rr -= bytes2copy;
-                                               pos += bytes2copy;
-                                               tarhdrsz += bytes2copy;
+                                               state->skip_file =
+                                                       (strcmp(&state->tarhdr[0], "standby.signal") == 0);
+                                               state->is_postgresql_auto_conf =
+                                                       (strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0);
                                        }
                                        else
-                                       {
-                                               /*
-                                                * We have the complete header structure in tarhdr,
-                                                * look at the file metadata: we may want append
-                                                * recovery info into postgresql.auto.conf and skip
-                                                * standby.signal file if recovery parameters are
-                                                * integrated as GUCs, and recovery.conf otherwise. In
-                                                * both cases we must calculate tar padding.
-                                                */
-                                               if (is_recovery_guc_supported)
-                                               {
-                                                       skip_file = (strcmp(&tarhdr[0], "standby.signal") == 0);
-                                                       is_postgresql_auto_conf = (strcmp(&tarhdr[0], "postgresql.auto.conf") == 0);
-                                               }
-                                               else
-                                                       skip_file = (strcmp(&tarhdr[0], "recovery.conf") == 0);
+                                               state->skip_file =
+                                                       (strcmp(&state->tarhdr[0], "recovery.conf") == 0);
 
-                                               filesz = read_tar_number(&tarhdr[124], 12);
-                                               file_padding_len = ((filesz + 511) & ~511) - filesz;
+                                       state->filesz = read_tar_number(&state->tarhdr[124], 12);
+                                       state->file_padding_len =
+                                               ((state->filesz + 511) & ~511) - state->filesz;
 
-                                               if (is_recovery_guc_supported &&
-                                                       is_postgresql_auto_conf &&
-                                                       writerecoveryconf)
-                                               {
-                                                       /* replace tar header */
-                                                       char            header[512];
+                                       if (state->is_recovery_guc_supported &&
+                                               state->is_postgresql_auto_conf &&
+                                               writerecoveryconf)
+                                       {
+                                               /* replace tar header */
+                                               char            header[512];
 
-                                                       tarCreateHeader(header, "postgresql.auto.conf", NULL,
-                                                                                       filesz + recoveryconfcontents->len,
-                                                                                       pg_file_create_mode, 04000, 02000,
-                                                                                       time(NULL));
+                                               tarCreateHeader(header, "postgresql.auto.conf", NULL,
+                                                                               state->filesz + recoveryconfcontents->len,
+                                                                               pg_file_create_mode, 04000, 02000,
+                                                                               time(NULL));
 
-                                                       WRITE_TAR_DATA(header, sizeof(header));
-                                               }
-                                               else
+                                               writeTarData(state, header, sizeof(header));
+                                       }
+                                       else
+                                       {
+                                               /* copy stream with padding */
+                                               state->filesz += state->file_padding_len;
+
+                                               if (!state->skip_file)
                                                {
-                                                       /* copy stream with padding */
-                                                       filesz += file_padding_len;
-
-                                                       if (!skip_file)
-                                                       {
-                                                               /*
-                                                                * If we're not skipping the file, write the
-                                                                * tar header unmodified.
-                                                                */
-                                                               WRITE_TAR_DATA(tarhdr, 512);
-                                                       }
+                                                       /*
+                                                        * If we're not skipping the file, write the tar
+                                                        * header unmodified.
+                                                        */
+                                                       writeTarData(state, state->tarhdr, 512);
                                                }
-
-                                               /* Next part is the file, not the header */
-                                               in_tarhdr = false;
                                        }
+
+                                       /* Next part is the file, not the header */
+                                       state->in_tarhdr = false;
                                }
-                               else
+                       }
+                       else
+                       {
+                               /*
+                                * We're processing a file's contents.
+                                */
+                               if (state->filesz > 0)
                                {
                                        /*
-                                        * We're processing a file's contents.
+                                        * We still have data to read (and possibly write).
                                         */
-                                       if (filesz > 0)
-                                       {
-                                               /*
-                                                * We still have data to read (and possibly write).
-                                                */
-                                               int                     bytes2write;
+                                       int                     bytes2write;
 
-                                               bytes2write = (filesz > rr ? rr : filesz);
+                                       bytes2write = (state->filesz > rr ? rr : state->filesz);
 
-                                               if (!skip_file)
-                                                       WRITE_TAR_DATA(copybuf + pos, bytes2write);
+                                       if (!state->skip_file)
+                                               writeTarData(state, copybuf + pos, bytes2write);
 
-                                               rr -= bytes2write;
-                                               pos += bytes2write;
-                                               filesz -= bytes2write;
-                                       }
-                                       else if (is_recovery_guc_supported &&
-                                                        is_postgresql_auto_conf &&
-                                                        writerecoveryconf)
-                                       {
-                                               /* append recovery config to postgresql.auto.conf */
-                                               int                     padding;
-                                               int                     tailsize;
+                                       rr -= bytes2write;
+                                       pos += bytes2write;
+                                       state->filesz -= bytes2write;
+                               }
+                               else if (state->is_recovery_guc_supported &&
+                                                state->is_postgresql_auto_conf &&
+                                                writerecoveryconf)
+                               {
+                                       /* append recovery config to postgresql.auto.conf */
+                                       int                     padding;
+                                       int                     tailsize;
 
-                                               tailsize = (512 - file_padding_len) + recoveryconfcontents->len;
-                                               padding = ((tailsize + 511) & ~511) - tailsize;
+                                       tailsize = (512 - state->file_padding_len) + recoveryconfcontents->len;
+                                       padding = ((tailsize + 511) & ~511) - tailsize;
 
-                                               WRITE_TAR_DATA(recoveryconfcontents->data, recoveryconfcontents->len);
+                                       writeTarData(state, recoveryconfcontents->data,
+                                                                recoveryconfcontents->len);
 
-                                               if (padding)
-                                               {
-                                                       char            zerobuf[512];
+                                       if (padding)
+                                       {
+                                               char            zerobuf[512];
 
-                                                       MemSet(zerobuf, 0, sizeof(zerobuf));
-                                                       WRITE_TAR_DATA(zerobuf, padding);
-                                               }
+                                               MemSet(zerobuf, 0, sizeof(zerobuf));
+                                               writeTarData(state, zerobuf, padding);
+                                       }
 
-                                               /* skip original file padding */
-                                               is_postgresql_auto_conf = false;
-                                               skip_file = true;
-                                               filesz += file_padding_len;
+                                       /* skip original file padding */
+                                       state->is_postgresql_auto_conf = false;
+                                       state->skip_file = true;
+                                       state->filesz += state->file_padding_len;
 
-                                               found_postgresql_auto_conf = true;
-                                       }
-                                       else
-                                       {
-                                               /*
-                                                * No more data in the current file, the next piece of
-                                                * data (if any) will be a new file header structure.
-                                                */
-                                               in_tarhdr = true;
-                                               skip_file = false;
-                                               is_postgresql_auto_conf = false;
-                                               tarhdrsz = 0;
-                                               filesz = 0;
-                                       }
+                                       state->found_postgresql_auto_conf = true;
+                               }
+                               else
+                               {
+                                       /*
+                                        * No more data in the current file, the next piece of
+                                        * data (if any) will be a new file header structure.
+                                        */
+                                       state->in_tarhdr = true;
+                                       state->skip_file = false;
+                                       state->is_postgresql_auto_conf = false;
+                                       state->tarhdrsz = 0;
+                                       state->filesz = 0;
                                }
                        }
                }
-               totaldone += r;
-               progress_report(rownum, filename, false);
-       }                                                       /* while (1) */
-       progress_report(rownum, filename, true);
-
-       if (copybuf != NULL)
-               PQfreemem(copybuf);
-
-       /*
-        * Do not sync the resulting tar file yet, all files are synced once at
-        * the end.
-        */
+       }
+       totaldone += r;
+       progress_report(state->tablespacenum, state->filename, false);
 }
 
 
@@ -1384,254 +1428,219 @@ get_tablespace_mapping(const char *dir)
 static void
 ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 {
-       char            current_path[MAXPGPATH];
-       char            filename[MAXPGPATH];
-       const char *mapped_tblspc_path;
-       pgoff_t         current_len_left = 0;
-       int                     current_padding = 0;
+       UnpackTarState state;
        bool            basetablespace;
-       char       *copybuf = NULL;
-       FILE       *file = NULL;
+
+       memset(&state, 0, sizeof(state));
+       state.tablespacenum = rownum;
 
        basetablespace = PQgetisnull(res, rownum, 0);
        if (basetablespace)
-               strlcpy(current_path, basedir, sizeof(current_path));
+               strlcpy(state.current_path, basedir, sizeof(state.current_path));
        else
-               strlcpy(current_path,
+               strlcpy(state.current_path,
                                get_tablespace_mapping(PQgetvalue(res, rownum, 1)),
-                               sizeof(current_path));
+                               sizeof(state.current_path));
 
-       /*
-        * Get the COPY data
-        */
-       res = PQgetResult(conn);
-       if (PQresultStatus(res) != PGRES_COPY_OUT)
+       ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state);
+
+
+       if (state.file)
+               fclose(state.file);
+
+       progress_report(rownum, state.filename, true);
+
+       if (state.file != NULL)
        {
-               pg_log_error("could not get COPY data stream: %s",
-                                        PQerrorMessage(conn));
+               pg_log_error("COPY stream ended before last file was finished");
                exit(1);
        }
 
-       while (1)
-       {
-               int                     r;
+       if (basetablespace && writerecoveryconf)
+               WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
 
-               if (copybuf != NULL)
-               {
-                       PQfreemem(copybuf);
-                       copybuf = NULL;
-               }
+       /*
+        * No data is synced here, everything is done for all tablespaces at the
+        * end.
+        */
+}
 
-               r = PQgetCopyData(conn, &copybuf, 0);
+static void
+ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
+{
+       UnpackTarState *state = callback_data;
 
-               if (r == -1)
-               {
-                       /*
-                        * End of chunk
-                        */
-                       if (file)
-                               fclose(file);
+       if (state->file == NULL)
+       {
+#ifndef WIN32
+               int                     filemode;
+#endif
 
-                       break;
-               }
-               else if (r == -2)
+               /*
+                * No current file, so this must be the header for a new file
+                */
+               if (r != 512)
                {
-                       pg_log_error("could not read COPY data: %s",
-                                                PQerrorMessage(conn));
+                       pg_log_error("invalid tar block header size: %zu", r);
                        exit(1);
                }
+               totaldone += 512;
 
-               if (file == NULL)
-               {
-#ifndef WIN32
-                       int                     filemode;
-#endif
-
-                       /*
-                        * No current file, so this must be the header for a new file
-                        */
-                       if (r != 512)
-                       {
-                               pg_log_error("invalid tar block header size: %d", r);
-                               exit(1);
-                       }
-                       totaldone += 512;
-
-                       current_len_left = read_tar_number(&copybuf[124], 12);
+               state->current_len_left = read_tar_number(&copybuf[124], 12);
 
 #ifndef WIN32
-                       /* Set permissions on the file */
-                       filemode = read_tar_number(&copybuf[100], 8);
+               /* Set permissions on the file */
+               filemode = read_tar_number(&copybuf[100], 8);
 #endif
 
-                       /*
-                        * All files are padded up to 512 bytes
-                        */
-                       current_padding =
-                               ((current_len_left + 511) & ~511) - current_len_left;
+               /*
+                * All files are padded up to 512 bytes
+                */
+               state->current_padding =
+                       ((state->current_len_left + 511) & ~511) - state->current_len_left;
 
+               /*
+                * First part of header is zero terminated filename
+                */
+               snprintf(state->filename, sizeof(state->filename),
+                                "%s/%s", state->current_path, copybuf);
+               if (state->filename[strlen(state->filename) - 1] == '/')
+               {
                        /*
-                        * First part of header is zero terminated filename
+                        * Ends in a slash means directory or symlink to directory
                         */
-                       snprintf(filename, sizeof(filename), "%s/%s", current_path,
-                                        copybuf);
-                       if (filename[strlen(filename) - 1] == '/')
+                       if (copybuf[156] == '5')
                        {
                                /*
-                                * Ends in a slash means directory or symlink to directory
+                                * Directory. Remove trailing slash first.
                                 */
-                               if (copybuf[156] == '5')
+                               state->filename[strlen(state->filename) - 1] = '\0';
+                               if (mkdir(state->filename, pg_dir_create_mode) != 0)
                                {
                                        /*
-                                        * Directory
+                                        * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
+                                        * clusters) will have been created by the wal receiver
+                                        * process. Also, when the WAL directory location was
+                                        * specified, pg_wal (or pg_xlog) has already been created
+                                        * as a symbolic link before starting the actual backup.
+                                        * So just ignore creation failures on related
+                                        * directories.
                                         */
-                                       filename[strlen(filename) - 1] = '\0';  /* Remove trailing slash */
-                                       if (mkdir(filename, pg_dir_create_mode) != 0)
+                                       if (!((pg_str_endswith(state->filename, "/pg_wal") ||
+                                                  pg_str_endswith(state->filename, "/pg_xlog") ||
+                                                  pg_str_endswith(state->filename, "/archive_status")) &&
+                                                 errno == EEXIST))
                                        {
-                                               /*
-                                                * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
-                                                * clusters) will have been created by the wal
-                                                * receiver process. Also, when the WAL directory
-                                                * location was specified, pg_wal (or pg_xlog) has
-                                                * already been created as a symbolic link before
-                                                * starting the actual backup. So just ignore creation
-                                                * failures on related directories.
-                                                */
-                                               if (!((pg_str_endswith(filename, "/pg_wal") ||
-                                                          pg_str_endswith(filename, "/pg_xlog") ||
-                                                          pg_str_endswith(filename, "/archive_status")) &&
-                                                         errno == EEXIST))
-                                               {
-                                                       pg_log_error("could not create directory \"%s\": %m",
-                                                                                filename);
-                                                       exit(1);
-                                               }
+                                               pg_log_error("could not create directory \"%s\": %m",
+                                                                        state->filename);
+                                               exit(1);
                                        }
+                               }
 #ifndef WIN32
-                                       if (chmod(filename, (mode_t) filemode))
-                                               pg_log_error("could not set permissions on directory \"%s\": %m",
-                                                                        filename);
+                               if (chmod(state->filename, (mode_t) filemode))
+                                       pg_log_error("could not set permissions on directory \"%s\": %m",
+                                                                state->filename);
 #endif
-                               }
-                               else if (copybuf[156] == '2')
-                               {
-                                       /*
-                                        * Symbolic link
-                                        *
-                                        * It's most likely a link in pg_tblspc directory, to the
-                                        * location of a tablespace. Apply any tablespace mapping
-                                        * given on the command line (--tablespace-mapping). (We
-                                        * blindly apply the mapping without checking that the
-                                        * link really is inside pg_tblspc. We don't expect there
-                                        * to be other symlinks in a data directory, but if there
-                                        * are, you can call it an undocumented feature that you
-                                        * can map them too.)
-                                        */
-                                       filename[strlen(filename) - 1] = '\0';  /* Remove trailing slash */
+                       }
+                       else if (copybuf[156] == '2')
+                       {
+                               /*
+                                * Symbolic link
+                                *
+                                * It's most likely a link in pg_tblspc directory, to the
+                                * location of a tablespace. Apply any tablespace mapping
+                                * given on the command line (--tablespace-mapping). (We
+                                * blindly apply the mapping without checking that the link
+                                * really is inside pg_tblspc. We don't expect there to be
+                                * other symlinks in a data directory, but if there are, you
+                                * can call it an undocumented feature that you can map them
+                                * too.)
+                                */
+                               state->filename[strlen(state->filename) - 1] = '\0';    /* Remove trailing slash */
 
-                                       mapped_tblspc_path = get_tablespace_mapping(&copybuf[157]);
-                                       if (symlink(mapped_tblspc_path, filename) != 0)
-                                       {
-                                               pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
-                                                                        filename, mapped_tblspc_path);
-                                               exit(1);
-                                       }
-                               }
-                               else
+                               state->mapped_tblspc_path =
+                                       get_tablespace_mapping(&copybuf[157]);
+                               if (symlink(state->mapped_tblspc_path, state->filename) != 0)
                                {
-                                       pg_log_error("unrecognized link indicator \"%c\"",
-                                                                copybuf[156]);
+                                       pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+                                                                state->filename, state->mapped_tblspc_path);
                                        exit(1);
                                }
-                               continue;               /* directory or link handled */
                        }
-
-                       /*
-                        * regular file
-                        */
-                       file = fopen(filename, "wb");
-                       if (!file)
+                       else
                        {
-                               pg_log_error("could not create file \"%s\": %m", filename);
+                               pg_log_error("unrecognized link indicator \"%c\"",
+                                                        copybuf[156]);
                                exit(1);
                        }
+                       return;                         /* directory or link handled */
+               }
+
+               /*
+                * regular file
+                */
+               state->file = fopen(state->filename, "wb");
+               if (!state->file)
+               {
+                       pg_log_error("could not create file \"%s\": %m", state->filename);
+                       exit(1);
+               }
 
 #ifndef WIN32
-                       if (chmod(filename, (mode_t) filemode))
-                               pg_log_error("could not set permissions on file \"%s\": %m",
-                                                        filename);
+               if (chmod(state->filename, (mode_t) filemode))
+                       pg_log_error("could not set permissions on file \"%s\": %m",
+                                                state->filename);
 #endif
 
-                       if (current_len_left == 0)
-                       {
-                               /*
-                                * Done with this file, next one will be a new tar header
-                                */
-                               fclose(file);
-                               file = NULL;
-                               continue;
-                       }
-               }                                               /* new file */
-               else
+               if (state->current_len_left == 0)
                {
                        /*
-                        * Continuing blocks in existing file
+                        * Done with this file, next one will be a new tar header
                         */
-                       if (current_len_left == 0 && r == current_padding)
-                       {
-                               /*
-                                * Received the padding block for this file, ignore it and
-                                * close the file, then move on to the next tar header.
-                                */
-                               fclose(file);
-                               file = NULL;
-                               totaldone += r;
-                               continue;
-                       }
-
-                       if (fwrite(copybuf, r, 1, file) != 1)
-                       {
-                               pg_log_error("could not write to file \"%s\": %m", filename);
-                               exit(1);
-                       }
-                       totaldone += r;
-                       progress_report(rownum, filename, false);
-
-                       current_len_left -= r;
-                       if (current_len_left == 0 && current_padding == 0)
-                       {
-                               /*
-                                * Received the last block, and there is no padding to be
-                                * expected. Close the file and move on to the next tar
-                                * header.
-                                */
-                               fclose(file);
-                               file = NULL;
-                               continue;
-                       }
-               }                                               /* continuing data in existing file */
-       }                                                       /* loop over all data blocks */
-       progress_report(rownum, filename, true);
-
-       if (file != NULL)
+                       fclose(state->file);
+                       state->file = NULL;
+                       return;
+               }
+       }                                                       /* new file */
+       else
        {
-               pg_log_error("COPY stream ended before last file was finished");
-               exit(1);
-       }
-
-       if (copybuf != NULL)
-               PQfreemem(copybuf);
+               /*
+                * Continuing blocks in existing file
+                */
+               if (state->current_len_left == 0 && r == state->current_padding)
+               {
+                       /*
+                        * Received the padding block for this file, ignore it and close
+                        * the file, then move on to the next tar header.
+                        */
+                       fclose(state->file);
+                       state->file = NULL;
+                       totaldone += r;
+                       return;
+               }
 
-       if (basetablespace && writerecoveryconf)
-               WriteRecoveryConfig(conn, basedir, recoveryconfcontents);
+               if (fwrite(copybuf, r, 1, state->file) != 1)
+               {
+                       pg_log_error("could not write to file \"%s\": %m", state->filename);
+                       exit(1);
+               }
+               totaldone += r;
+               progress_report(state->tablespacenum, state->filename, false);
 
-       /*
-        * No data is synced here, everything is done for all tablespaces at the
-        * end.
-        */
+               state->current_len_left -= r;
+               if (state->current_len_left == 0 && state->current_padding == 0)
+               {
+                       /*
+                        * Received the last block, and there is no padding to be
+                        * expected. Close the file and move on to the next tar header.
+                        */
+                       fclose(state->file);
+                       state->file = NULL;
+                       return;
+               }
+       }                                                       /* continuing data in existing file */
 }
 
-
 static void
 BaseBackup(void)
 {