]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
pg_waldump: Add support for reading WAL from tar archives
authorAndrew Dunstan <andrew@dunslane.net>
Fri, 20 Mar 2026 19:31:35 +0000 (15:31 -0400)
committerAndrew Dunstan <andrew@dunslane.net>
Fri, 20 Mar 2026 19:31:35 +0000 (15:31 -0400)
pg_waldump can now accept the path to a tar archive (optionally
compressed with gzip, lz4, or zstd) containing WAL files and decode
them.  This was added primarily for pg_verifybackup, which previously
had to skip WAL parsing for tar-format backups.

The implementation uses the existing archive streamer infrastructure
with a hash table to track WAL segments read from the archive.  If WAL
files within the archive are not in sequential order, out-of-order
segments are written to a temporary directory (created via mkdtemp under
$TMPDIR or the archive's directory) and read back when needed.  An
atexit callback ensures the temporary directory is cleaned up.

The --follow option is not supported when reading from a tar archive.

Author: Amul Sul <sulamul@gmail.com>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Jakub Wartak <jakub.wartak@enterprisedb.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Reviewed-by: Andrew Dunstan <andrew@dunslane.net>
Reviewed-by: Zsolt Parragi <zsolt.parragi@percona.com>
discussion: https://postgr.es/m/CAAJ_b94bqdWN3h2J-PzzzQ2Npbwct5ZQHggn_QoYGhC2rn-=WQ@mail.gmail.com

doc/src/sgml/ref/pg_waldump.sgml
src/bin/pg_waldump/Makefile
src/bin/pg_waldump/archive_waldump.c [new file with mode: 0644]
src/bin/pg_waldump/meson.build
src/bin/pg_waldump/pg_waldump.c
src/bin/pg_waldump/pg_waldump.h
src/bin/pg_waldump/t/001_basic.pl
src/tools/pgindent/typedefs.list

index d1715ff51242d636b0d99ebbd2a8c8b1c9b60d8a..9bbb4bd5772700ed23b5d98918160753aa2edd56 100644 (file)
@@ -141,13 +141,21 @@ PostgreSQL documentation
       <term><option>--path=<replaceable>path</replaceable></option></term>
       <listitem>
        <para>
-        Specifies a directory to search for WAL segment files or a
-        directory with a <literal>pg_wal</literal> subdirectory that
+        Specifies a tar archive or a directory to search for WAL segment files
+        or a directory with a <literal>pg_wal</literal> subdirectory that
         contains such files.  The default is to search in the current
         directory, the <literal>pg_wal</literal> subdirectory of the
         current directory, and the <literal>pg_wal</literal> subdirectory
         of <envar>PGDATA</envar>.
        </para>
+       <para>
+        If a tar archive is provided and its WAL segment files are not in
+        sequential order, those files will be written to a temporary directory
+        named starting with <filename>waldump_tmp</filename>. This directory will be
+        created inside the directory specified by the <envar>TMPDIR</envar>
+        environment variable if it is set; otherwise, it will be created within
+        the same directory as the tar archive.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -383,6 +391,17 @@ PostgreSQL documentation
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><envar>TMPDIR</envar></term>
+    <listitem>
+     <para>
+      Directory in which to create temporary files when reading WAL from a
+      tar archive with out-of-order segment files. If not set, the temporary
+      directory is created within the same directory as the tar archive.
+     </para>
+    </listitem>
+   </varlistentry>
   </variablelist>
  </refsect1>
 
index 4c1ee649501f44201ba99ef6520b41eda4cbc162..aabb87566a2dc3f7af694483709c211b09737bc8 100644 (file)
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_waldump - decode and display WAL"
 PGAPPICON=win32
 
+# make these available to TAP test scripts
+export TAR
+
 subdir = src/bin/pg_waldump
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
@@ -10,13 +13,15 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
        $(RMGRDESCOBJS) \
        $(WIN32RES) \
+       archive_waldump.o \
        compat.o \
        pg_waldump.o \
        rmgrdesc.o \
        xlogreader.o \
        xlogstats.o
 
-override CPPFLAGS := -DFRONTEND $(CPPFLAGS)
+override CPPFLAGS := -DFRONTEND -I$(libpq_srcdir) $(CPPFLAGS)
+LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils
 
 RMGRDESCSOURCES = $(sort $(notdir $(wildcard $(top_srcdir)/src/backend/access/rmgrdesc/*desc*.c)))
 RMGRDESCOBJS = $(patsubst %.c,%.o,$(RMGRDESCSOURCES))
diff --git a/src/bin/pg_waldump/archive_waldump.c b/src/bin/pg_waldump/archive_waldump.c
new file mode 100644 (file)
index 0000000..b078c2d
--- /dev/null
@@ -0,0 +1,860 @@
+/*-------------------------------------------------------------------------
+ *
+ * archive_waldump.c
+ *             A generic facility for reading WAL data from tar archives via archive
+ *             streamer.
+ *
+ * Portions Copyright (c) 2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *             src/bin/pg_waldump/archive_waldump.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#include "access/xlog_internal.h"
+#include "common/file_perm.h"
+#include "common/hashfn.h"
+#include "common/logging.h"
+#include "fe_utils/simple_list.h"
+#include "pg_waldump.h"
+
+/*
+ * How many bytes should we try to read from a file at once?
+ */
+#define READ_CHUNK_SIZE                                (128 * 1024)
+
+/* Temporary directory for spilled WAL segment files */
+char      *TmpWalSegDir = NULL;
+
+/*
+ * Check if the start segment number is zero; this indicates a request to read
+ * any WAL file.
+ */
+#define READ_ANY_WAL(privateInfo)      ((privateInfo)->start_segno == 0)
+
+/*
+ * Hash entry representing a WAL segment retrieved from the archive.
+ *
+ * While WAL segments are typically read sequentially, individual entries
+ * maintain their own buffers for the following reasons:
+ *
+ * 1. Boundary Handling: The archive streamer provides a continuous byte
+ * stream. A single streaming chunk may contain the end of one WAL segment
+ * and the start of the next. Separate buffers allow us to easily
+ * partition and track these bytes by their respective segments.
+ *
+ * 2. Out-of-Order Support: Dedicated buffers simplify logic when segments
+ * are archived or retrieved out of sequence.
+ *
+ * To minimize the memory footprint, entries and their associated buffers are
+ * freed immediately once consumed. Since pg_waldump does not request the same
+ * bytes twice, a segment is discarded as soon as pg_waldump moves past it.
+ */
+typedef struct ArchivedWALFile
+{
+       uint32          status;                 /* hash status */
+       const char *fname;                      /* hash key: WAL segment name */
+
+       StringInfo      buf;                    /* holds WAL bytes read from archive */
+       bool            spilled;                /* true if the WAL data was spilled to a
+                                                                * temporary file */
+
+       int                     read_len;               /* total bytes received from archive for this
+                                                                * segment, including already-consumed data */
+} ArchivedWALFile;
+
+static uint32 hash_string_pointer(const char *s);
+#define SH_PREFIX                              ArchivedWAL
+#define SH_ELEMENT_TYPE                        ArchivedWALFile
+#define SH_KEY_TYPE                            const char *
+#define SH_KEY                                 fname
+#define SH_HASH_KEY(tb, key)   hash_string_pointer(key)
+#define SH_EQUAL(tb, a, b)             (strcmp(a, b) == 0)
+#define SH_SCOPE                               static inline
+#define SH_RAW_ALLOCATOR               pg_malloc0
+#define SH_DECLARE
+#define SH_DEFINE
+#include "lib/simplehash.h"
+
+typedef struct astreamer_waldump
+{
+       astreamer       base;
+       XLogDumpPrivate *privateInfo;
+} astreamer_waldump;
+
+static ArchivedWALFile *get_archive_wal_entry(const char *fname,
+                                                                                         XLogDumpPrivate *privateInfo);
+static int     read_archive_file(XLogDumpPrivate *privateInfo, Size count);
+static void setup_tmpwal_dir(const char *waldir);
+static void cleanup_tmpwal_dir_atexit(void);
+
+static FILE *prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo);
+static void perform_tmp_write(const char *fname, StringInfo buf, FILE *file);
+
+static astreamer *astreamer_waldump_new(XLogDumpPrivate *privateInfo);
+static void astreamer_waldump_content(astreamer *streamer,
+                                                                         astreamer_member *member,
+                                                                         const char *data, int len,
+                                                                         astreamer_archive_context context);
+static void astreamer_waldump_finalize(astreamer *streamer);
+static void astreamer_waldump_free(astreamer *streamer);
+
+static bool member_is_wal_file(astreamer_waldump *mystreamer,
+                                                          astreamer_member *member,
+                                                          char **fname);
+
+static const astreamer_ops astreamer_waldump_ops = {
+       .content = astreamer_waldump_content,
+       .finalize = astreamer_waldump_finalize,
+       .free = astreamer_waldump_free
+};
+
+/*
+ * Initializes the tar archive reader: opens the archive, builds a hash table
+ * for WAL entries, reads ahead until a full WAL page header is available to
+ * determine the WAL segment size, and computes start/end segment numbers for
+ * filtering.
+ */
+void
+init_archive_reader(XLogDumpPrivate *privateInfo,
+                                       pg_compress_algorithm compression)
+{
+       int                     fd;
+       astreamer  *streamer;
+       ArchivedWALFile *entry = NULL;
+       XLogLongPageHeader longhdr;
+       XLogSegNo       segno;
+       TimeLineID      timeline;
+
+       /* Open tar archive and store its file descriptor */
+       fd = open_file_in_directory(privateInfo->archive_dir,
+                                                               privateInfo->archive_name);
+
+       if (fd < 0)
+               pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
+
+       privateInfo->archive_fd = fd;
+
+       streamer = astreamer_waldump_new(privateInfo);
+
+       /* We must first parse the tar archive. */
+       streamer = astreamer_tar_parser_new(streamer);
+
+       /* If the archive is compressed, decompress before parsing. */
+       if (compression == PG_COMPRESSION_GZIP)
+               streamer = astreamer_gzip_decompressor_new(streamer);
+       else if (compression == PG_COMPRESSION_LZ4)
+               streamer = astreamer_lz4_decompressor_new(streamer);
+       else if (compression == PG_COMPRESSION_ZSTD)
+               streamer = astreamer_zstd_decompressor_new(streamer);
+
+       privateInfo->archive_streamer = streamer;
+
+       /*
+        * Allocate a buffer for reading the archive file to facilitate content
+        * decoding; read requests must not exceed the allocated buffer size.
+        */
+       privateInfo->archive_read_buf = pg_malloc(READ_CHUNK_SIZE);
+
+#ifdef USE_ASSERT_CHECKING
+       privateInfo->archive_read_buf_size = READ_CHUNK_SIZE;
+#endif
+
+       /*
+        * Hash table storing WAL entries read from the archive with an arbitrary
+        * initial size.
+        */
+       privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
+
+       /*
+        * Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from
+        * the first WAL segment in the archive so we can extract the WAL segment
+        * size from the long page header.
+        */
+       while (entry == NULL || entry->buf->len < XLOG_BLCKSZ)
+       {
+               if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0)
+                       pg_fatal("could not find WAL in archive \"%s\"",
+                                        privateInfo->archive_name);
+
+               entry = privateInfo->cur_file;
+       }
+
+       /* Extract the WAL segment size from the long page header */
+       longhdr = (XLogLongPageHeader) entry->buf->data;
+
+       if (!IsValidWalSegSize(longhdr->xlp_seg_size))
+       {
+               pg_log_error(ngettext("invalid WAL segment size in WAL file from archive \"%s\" (%d byte)",
+                                                         "invalid WAL segment size in WAL file from archive \"%s\" (%d bytes)",
+                                                         longhdr->xlp_seg_size),
+                                        privateInfo->archive_name, longhdr->xlp_seg_size);
+               pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
+               exit(1);
+       }
+
+       privateInfo->segsize = longhdr->xlp_seg_size;
+
+       /*
+        * With the WAL segment size available, we can now initialize the
+        * dependent start and end segment numbers.
+        */
+       Assert(!XLogRecPtrIsInvalid(privateInfo->startptr));
+       XLByteToSeg(privateInfo->startptr, privateInfo->start_segno,
+                               privateInfo->segsize);
+
+       if (!XLogRecPtrIsInvalid(privateInfo->endptr))
+               XLByteToSeg(privateInfo->endptr, privateInfo->end_segno,
+                                       privateInfo->segsize);
+
+       /*
+        * This WAL record was fetched before the filtering parameters
+        * (start_segno and end_segno) were fully initialized. Perform the
+        * relevance check against the user-provided range now; if the WAL falls
+        * outside this range, remove it from the hash table. Subsequent WAL will
+        * be filtered automatically by the archive streamer using the updated
+        * start_segno and end_segno values.
+        */
+       XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
+       if (privateInfo->timeline != timeline ||
+               privateInfo->start_segno > segno ||
+               privateInfo->end_segno < segno)
+               free_archive_wal_entry(entry->fname, privateInfo);
+}
+
+/*
+ * Release the archive streamer chain and close the archive file.
+ */
+void
+free_archive_reader(XLogDumpPrivate *privateInfo)
+{
+       /*
+        * NB: Normally, astreamer_finalize() is called before astreamer_free() to
+        * flush any remaining buffered data or to ensure the end of the tar
+        * archive is reached.  However, when decoding WAL, once we hit the end
+        * LSN, any remaining buffered data or unread portion of the archive can
+        * be safely ignored.
+        */
+       astreamer_free(privateInfo->archive_streamer);
+
+       /* Free any remaining hash table entries and their buffers. */
+       if (privateInfo->archive_wal_htab != NULL)
+       {
+               ArchivedWAL_iterator iter;
+               ArchivedWALFile *entry;
+
+               ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
+               while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
+                                                                                       &iter)) != NULL)
+               {
+                       if (entry->buf != NULL)
+                               destroyStringInfo(entry->buf);
+               }
+               ArchivedWAL_destroy(privateInfo->archive_wal_htab);
+               privateInfo->archive_wal_htab = NULL;
+       }
+
+       /* Free the reusable read buffer. */
+       if (privateInfo->archive_read_buf != NULL)
+       {
+               pg_free(privateInfo->archive_read_buf);
+               privateInfo->archive_read_buf = NULL;
+       }
+
+       /* Close the file. */
+       if (close(privateInfo->archive_fd) != 0)
+               pg_log_error("could not close file \"%s\": %m",
+                                        privateInfo->archive_name);
+}
+
+/*
+ * Copies the requested WAL data from the hash entry's buffer into readBuff.
+ * If the buffer does not yet contain the needed bytes, fetches more data from
+ * the tar archive via the archive streamer.
+ */
+int
+read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr,
+                                         Size count, char *readBuff)
+{
+       char       *p = readBuff;
+       Size            nbytes = count;
+       XLogRecPtr      recptr = targetPagePtr;
+       int                     segsize = privateInfo->segsize;
+       XLogSegNo       segno;
+       char            fname[MAXFNAMELEN];
+       ArchivedWALFile *entry;
+
+       /* Identify the segment and locate its entry in the archive hash */
+       XLByteToSeg(targetPagePtr, segno, segsize);
+       XLogFileName(fname, privateInfo->timeline, segno, segsize);
+       entry = get_archive_wal_entry(fname, privateInfo);
+
+       while (nbytes > 0)
+       {
+               char       *buf = entry->buf->data;
+               int                     bufLen = entry->buf->len;
+               XLogRecPtr      endPtr;
+               XLogRecPtr      startPtr;
+
+               /*
+                * Calculate the LSN range currently residing in the buffer.
+                *
+                * read_len tracks total bytes received for this segment (including
+                * already-discarded data), so endPtr is the LSN just past the last
+                * buffered byte, and startPtr is the LSN of the first buffered byte.
+                */
+               XLogSegNoOffsetToRecPtr(segno, entry->read_len, segsize, endPtr);
+               startPtr = endPtr - bufLen;
+
+               /*
+                * Copy the requested WAL record if it exists in the buffer.
+                */
+               if (bufLen > 0 && startPtr <= recptr && recptr < endPtr)
+               {
+                       int                     copyBytes;
+                       int                     offset = recptr - startPtr;
+
+                       /*
+                        * Given startPtr <= recptr < endPtr and a total buffer size
+                        * 'bufLen', the offset (recptr - startPtr) will always be less
+                        * than 'bufLen'.
+                        */
+                       Assert(offset < bufLen);
+
+                       copyBytes = Min(nbytes, bufLen - offset);
+                       memcpy(p, buf + offset, copyBytes);
+
+                       /* Update state for read */
+                       recptr += copyBytes;
+                       nbytes -= copyBytes;
+                       p += copyBytes;
+               }
+               else
+               {
+                       /*
+                        * Before starting the actual decoding loop, pg_waldump tries to
+                        * locate the first valid record from the user-specified start
+                        * position, which might not be the start of a WAL record and
+                        * could fall in the middle of a record that spans multiple pages.
+                        * Consequently, the valid start position the decoder is looking
+                        * for could be far away from that initial position.
+                        *
+                        * This may involve reading across multiple pages, and this
+                        * pre-reading fetches data in multiple rounds from the archive
+                        * streamer; normally, we would throw away existing buffer
+                        * contents to fetch the next set of data, but that existing data
+                        * might be needed once the main loop starts. Because previously
+                        * read data cannot be re-read by the archive streamer, we delay
+                        * resetting the buffer until the main decoding loop is entered.
+                        *
+                        * Once pg_waldump has entered the main loop, it may re-read the
+                        * currently active page, but never an older one; therefore, any
+                        * fully consumed WAL data preceding the current page can then be
+                        * safely discarded.
+                        */
+                       if (privateInfo->decoding_started)
+                       {
+                               resetStringInfo(entry->buf);
+
+                               /*
+                                * Push back the partial page data for the current page to the
+                                * buffer, ensuring a full page remains available for
+                                * re-reading if requested.
+                                */
+                               if (p > readBuff)
+                               {
+                                       Assert((count - nbytes) > 0);
+                                       appendBinaryStringInfo(entry->buf, readBuff, count - nbytes);
+                               }
+                       }
+
+                       /*
+                        * Now, fetch more data.  Raise an error if the archive streamer
+                        * has moved past our segment (meaning the WAL file in the archive
+                        * is shorter than expected) or if reading the archive reached
+                        * EOF.
+                        */
+                       if (privateInfo->cur_file != entry)
+                               pg_fatal("WAL segment \"%s\" in archive \"%s\" is too short: read %lld of %lld bytes",
+                                                fname, privateInfo->archive_name,
+                                                (long long int) (count - nbytes),
+                                                (long long int) count);
+                       if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
+                               pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes",
+                                                privateInfo->archive_name, fname,
+                                                (long long int) (count - nbytes),
+                                                (long long int) count);
+               }
+       }
+
+       /*
+        * Should have successfully read all the requested bytes or reported a
+        * failure before this point.
+        */
+       Assert(nbytes == 0);
+
+       /*
+        * Return count unchanged; the caller expects this convention, matching
+        * the routine that reads WAL pages from physical files.
+        */
+       return count;
+}
+
+/*
+ * Releases the buffer of a WAL entry that is no longer needed, preventing the
+ * accumulation of irrelevant WAL data.  Also removes any associated temporary
+ * file and clears privateInfo->cur_file if it points to this entry, so the
+ * archive streamer skips subsequent data for it.
+ */
+void
+free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
+{
+       ArchivedWALFile *entry;
+
+       entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
+
+       if (entry == NULL)
+               return;
+
+       /* Destroy the buffer */
+       destroyStringInfo(entry->buf);
+       entry->buf = NULL;
+
+       /* Remove temporary file if any */
+       if (entry->spilled)
+       {
+               char            fpath[MAXPGPATH];
+
+               snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
+
+               if (unlink(fpath) == 0)
+                       pg_log_debug("removed file \"%s\"", fpath);
+       }
+
+       /* Clear cur_file if it points to the entry being freed */
+       if (privateInfo->cur_file == entry)
+               privateInfo->cur_file = NULL;
+
+       ArchivedWAL_delete_item(privateInfo->archive_wal_htab, entry);
+}
+
+/*
+ * Returns the archived WAL entry from the hash table if it already exists.
+ * Otherwise, reads more data from the archive until the requested entry is
+ * found.  If the archive streamer is reading a WAL file from the archive that
+ * is not currently needed, that data is spilled to a temporary file for later
+ * retrieval.
+ */
+static ArchivedWALFile *
+get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
+{
+       ArchivedWALFile *entry = NULL;
+       FILE       *write_fp = NULL;
+
+       /*
+        * Search the hash table first. If the entry is found, return it.
+        * Otherwise, the requested WAL entry hasn't been read from the archive
+        * yet; invoke the archive streamer to fetch it.
+        */
+       while (1)
+       {
+               /*
+                * Search hash table.
+                *
+                * We perform the search inside the loop because a single iteration of
+                * the archive reader may decompress and extract multiple files into
+                * the hash table. One of these newly added files could be the one we
+                * are seeking.
+                */
+               entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
+
+               if (entry != NULL)
+                       return entry;
+
+               /*
+                * Capture the current entry before calling read_archive_file(),
+                * because cur_file may advance to a new segment during streaming. We
+                * hold this reference so we can flush any remaining buffer data and
+                * close the write handle once we detect that cur_file has moved on.
+                */
+               entry = privateInfo->cur_file;
+
+               /*
+                * Fetch more data either when no current file is being tracked or
+                * when its buffer has been fully flushed to the temporary file.
+                */
+               if (entry == NULL || entry->buf->len == 0)
+               {
+                       if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
+                               break;                  /* archive file ended */
+               }
+
+               /*
+                * Archive streamer is reading a non-WAL file or an irrelevant WAL
+                * file.
+                */
+               if (entry == NULL)
+                       continue;
+
+               /*
+                * The streamer is producing a WAL segment that isn't the one asked
+                * for; it must be arriving out of order.  Spill its data to disk so
+                * it can be read back when needed.
+                */
+               Assert(strcmp(fname, entry->fname) != 0);
+
+               /* Create a temporary file if one does not already exist */
+               if (!entry->spilled)
+               {
+                       write_fp = prepare_tmp_write(entry->fname, privateInfo);
+                       entry->spilled = true;
+               }
+
+               /* Flush data from the buffer to the file */
+               perform_tmp_write(entry->fname, entry->buf, write_fp);
+               resetStringInfo(entry->buf);
+
+               /*
+                * If cur_file changed since we captured entry above, the archive
+                * streamer has finished this segment and moved on.  Close its spill
+                * file handle so data is flushed to disk before the next segment
+                * starts writing to a different handle.
+                */
+               if (entry != privateInfo->cur_file && write_fp != NULL)
+               {
+                       fclose(write_fp);
+                       write_fp = NULL;
+               }
+       }
+
+       /* Requested WAL segment not found */
+       pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
+                        fname, privateInfo->archive_name);
+}
+
+/*
+ * Reads a chunk from the archive file and passes it through the streamer
+ * pipeline for decompression (if needed) and tar member extraction.
+ */
+static int
+read_archive_file(XLogDumpPrivate *privateInfo, Size count)
+{
+       int                     rc;
+
+       /* The read request must not exceed the allocated buffer size. */
+       Assert(privateInfo->archive_read_buf_size >= count);
+
+       rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count);
+       if (rc < 0)
+               pg_fatal("could not read file \"%s\": %m",
+                                privateInfo->archive_name);
+
+       /*
+        * Decompress (if required), and then parse the previously read contents
+        * of the tar file.
+        */
+       if (rc > 0)
+               astreamer_content(privateInfo->archive_streamer, NULL,
+                                                 privateInfo->archive_read_buf, rc,
+                                                 ASTREAMER_UNKNOWN);
+
+       return rc;
+}
+
+/*
+ * Set up a temporary directory to temporarily store WAL segments.
+ */
+static void
+setup_tmpwal_dir(const char *waldir)
+{
+       const char *tmpdir = getenv("TMPDIR");
+       char       *template;
+
+       Assert(TmpWalSegDir == NULL);
+
+       /*
+        * Use the directory specified by the TMPDIR environment variable. If it's
+        * not set, fall back to the provided WAL directory to store WAL files
+        * temporarily.
+        */
+       template = psprintf("%s/waldump_tmp-XXXXXX",
+                                               tmpdir ? tmpdir : waldir);
+       TmpWalSegDir = mkdtemp(template);
+
+       if (TmpWalSegDir == NULL)
+               pg_fatal("could not create directory \"%s\": %m", template);
+
+       canonicalize_path(TmpWalSegDir);
+
+       pg_log_debug("created directory \"%s\"", TmpWalSegDir);
+}
+
+/*
+ * Remove temporary directory at exit, if any.
+ */
+static void
+cleanup_tmpwal_dir_atexit(void)
+{
+       Assert(TmpWalSegDir != NULL);
+
+       rmtree(TmpWalSegDir, true);
+
+       TmpWalSegDir = NULL;
+}
+
+/*
+ * Open a file in the temporary spill directory for writing an out-of-order
+ * WAL segment, creating the directory and registering the cleanup callback
+ * if not already done.  Returns the open file handle.
+ */
+static FILE *
+prepare_tmp_write(const char *fname, XLogDumpPrivate *privateInfo)
+{
+       char            fpath[MAXPGPATH];
+       FILE       *file;
+
+       /*
+        * Setup temporary directory to store WAL segments and set up an exit
+        * callback to remove it upon completion if not already.
+        */
+       if (unlikely(TmpWalSegDir == NULL))
+       {
+               setup_tmpwal_dir(privateInfo->archive_dir);
+               atexit(cleanup_tmpwal_dir_atexit);
+       }
+
+       snprintf(fpath, MAXPGPATH, "%s/%s", TmpWalSegDir, fname);
+
+       /* Open the spill file for writing */
+       file = fopen(fpath, PG_BINARY_W);
+       if (file == NULL)
+               pg_fatal("could not create file \"%s\": %m", fpath);
+
+#ifndef WIN32
+       if (chmod(fpath, pg_file_create_mode))
+               pg_fatal("could not set permissions on file \"%s\": %m",
+                                fpath);
+#endif
+
+       pg_log_debug("spilling to temporary file \"%s\"", fpath);
+
+       return file;
+}
+
+/*
+ * Write buffer data to the given file handle.
+ */
+static void
+perform_tmp_write(const char *fname, StringInfo buf, FILE *file)
+{
+       Assert(file);
+
+       errno = 0;
+       if (buf->len > 0 && fwrite(buf->data, buf->len, 1, file) != 1)
+       {
+               /*
+                * If write didn't set errno, assume problem is no disk space
+                */
+               if (errno == 0)
+                       errno = ENOSPC;
+               pg_fatal("could not write to file \"%s/%s\": %m", TmpWalSegDir, fname);
+       }
+}
+
+/*
+ * Create an astreamer that can read WAL from tar file.
+ */
+static astreamer *
+astreamer_waldump_new(XLogDumpPrivate *privateInfo)
+{
+       astreamer_waldump *streamer;
+
+       streamer = palloc0_object(astreamer_waldump);
+       *((const astreamer_ops **) &streamer->base.bbs_ops) =
+               &astreamer_waldump_ops;
+
+       streamer->privateInfo = privateInfo;
+
+       return &streamer->base;
+}
+
+/*
+ * Main entry point of the archive streamer for reading WAL data from a tar
+ * file. If a member is identified as a valid WAL file, a hash entry is created
+ * for it, and its contents are copied into that entry's buffer, making them
+ * accessible to the decoding routine.
+ */
+static void
+astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
+                                                 const char *data, int len,
+                                                 astreamer_archive_context context)
+{
+       astreamer_waldump *mystreamer = (astreamer_waldump *) streamer;
+       XLogDumpPrivate *privateInfo = mystreamer->privateInfo;
+
+       Assert(context != ASTREAMER_UNKNOWN);
+
+       switch (context)
+       {
+               case ASTREAMER_MEMBER_HEADER:
+                       {
+                               char       *fname = NULL;
+                               ArchivedWALFile *entry;
+                               bool            found;
+
+                               pg_log_debug("reading \"%s\"", member->pathname);
+
+                               if (!member_is_wal_file(mystreamer, member, &fname))
+                                       break;
+
+                               /*
+                                * Skip range filtering during initial startup, before the WAL
+                                * segment size and segment number bounds are known.
+                                */
+                               if (!READ_ANY_WAL(privateInfo))
+                               {
+                                       XLogSegNo       segno;
+                                       TimeLineID      timeline;
+
+                                       /*
+                                        * Skip the segment if the timeline does not match, if it
+                                        * falls outside the caller-specified range.
+                                        */
+                                       XLogFromFileName(fname, &timeline, &segno, privateInfo->segsize);
+                                       if (privateInfo->timeline != timeline ||
+                                               privateInfo->start_segno > segno ||
+                                               privateInfo->end_segno < segno)
+                                       {
+                                               pfree(fname);
+                                               break;
+                                       }
+                               }
+
+                               entry = ArchivedWAL_insert(privateInfo->archive_wal_htab,
+                                                                                  fname, &found);
+
+                               /*
+                                * Shouldn't happen, but if it does, simply ignore the
+                                * duplicate WAL file.
+                                */
+                               if (found)
+                               {
+                                       pg_log_warning("ignoring duplicate WAL \"%s\" found in archive \"%s\"",
+                                                                  member->pathname, privateInfo->archive_name);
+                                       pfree(fname);
+                                       break;
+                               }
+
+                               entry->buf = makeStringInfo();
+                               entry->spilled = false;
+                               entry->read_len = 0;
+                               privateInfo->cur_file = entry;
+                       }
+                       break;
+
+               case ASTREAMER_MEMBER_CONTENTS:
+                       if (privateInfo->cur_file)
+                       {
+                               appendBinaryStringInfo(privateInfo->cur_file->buf, data, len);
+                               privateInfo->cur_file->read_len += len;
+                       }
+                       break;
+
+               case ASTREAMER_MEMBER_TRAILER:
+
+                       /*
+                        * End of this tar member; mark cur_file NULL so subsequent
+                        * content callbacks (if any) know no WAL file is currently
+                        * active.
+                        */
+                       privateInfo->cur_file = NULL;
+                       break;
+
+               case ASTREAMER_ARCHIVE_TRAILER:
+                       break;
+
+               default:
+                       /* Shouldn't happen. */
+                       pg_fatal("unexpected state while parsing tar file");
+       }
+}
+
+/*
+ * End-of-stream processing for an astreamer_waldump stream.  This is a
+ * terminal streamer so it must have no successor.
+ */
+static void
+astreamer_waldump_finalize(astreamer *streamer)
+{
+       Assert(streamer->bbs_next == NULL);
+}
+
+/*
+ * Free memory associated with an astreamer_waldump stream.
+ */
+static void
+astreamer_waldump_free(astreamer *streamer)
+{
+       Assert(streamer->bbs_next == NULL);
+       pfree(streamer);
+}
+
+/*
+ * Returns true if the archive member name matches the WAL naming format. If
+ * successful, it also outputs the WAL segment name.
+ */
+static bool
+member_is_wal_file(astreamer_waldump *mystreamer, astreamer_member *member,
+                                  char **fname)
+{
+       int                     pathlen;
+       char            pathname[MAXPGPATH];
+       char       *filename;
+
+       /* We are only interested in normal files */
+       if (member->is_directory || member->is_link)
+               return false;
+
+       if (strlen(member->pathname) < XLOG_FNAME_LEN)
+               return false;
+
+       /*
+        * For a correct comparison, we must remove any '.' or '..' components
+        * from the member pathname. Similar to member_verify_header(), we prepend
+        * './' to the path so that canonicalize_path() can properly resolve and
+        * strip these references from the tar member name.
+        */
+       snprintf(pathname, MAXPGPATH, "./%s", member->pathname);
+       canonicalize_path(pathname);
+       pathlen = strlen(pathname);
+
+       /* Skip files in subdirectories other than pg_wal/ */
+       if (pathlen > XLOG_FNAME_LEN &&
+               strncmp(pathname, XLOGDIR, strlen(XLOGDIR)) != 0)
+               return false;
+
+       /* WAL file may appear with a full path (e.g., pg_wal/<name>) */
+       filename = pathname + (pathlen - XLOG_FNAME_LEN);
+       if (!IsXLogFileName(filename))
+               return false;
+
+       *fname = pnstrdup(filename, XLOG_FNAME_LEN);
+
+       return true;
+}
+
+/*
+ * Helper function for WAL file hash table.
+ */
+static uint32
+hash_string_pointer(const char *s)
+{
+       unsigned char *ss = (unsigned char *) s;
+
+       return hash_bytes(ss, strlen(s));
+}
index 633a9874bb5d4dd3e3d66fd323eeb0a41273709f..5296f21b82c7fdcb0355cab0ffe96e8a4cab4a25 100644 (file)
@@ -1,6 +1,7 @@
 # Copyright (c) 2022-2026, PostgreSQL Global Development Group
 
 pg_waldump_sources = files(
+  'archive_waldump.c',
   'compat.c',
   'pg_waldump.c',
   'rmgrdesc.c',
@@ -18,7 +19,7 @@ endif
 
 pg_waldump = executable('pg_waldump',
   pg_waldump_sources,
-  dependencies: [frontend_code, lz4, zstd],
+  dependencies: [frontend_code, libpq, lz4, zstd],
   c_args: ['-DFRONTEND'], # needed for xlogreader et al
   kwargs: default_bin_args,
 )
@@ -29,6 +30,7 @@ tests += {
   'sd': meson.current_source_dir(),
   'bd': meson.current_build_dir(),
   'tap': {
+    'env': {'TAR': tar.found() ? tar.full_path() : ''},
     'tests': [
       't/001_basic.pl',
       't/002_save_fullpage.pl',
index 5d31b15dbd8f8530835c1eb1e25e7ef906c5934b..f82507ef6967c001bc492b6f1a09844c3a27c353 100644 (file)
@@ -176,7 +176,7 @@ split_path(const char *path, char **dir, char **fname)
  *
  * return a read only fd
  */
-static int
+int
 open_file_in_directory(const char *directory, const char *fname)
 {
        int                     fd = -1;
@@ -327,8 +327,8 @@ identify_target_directory(char *directory, char *fname, int *WalSegSz)
 }
 
 /*
- * Returns the size in bytes of the data to be read. Returns -1 if the end
- * point has already been reached.
+ * Returns the number of bytes to read for the given page.  Returns -1 if
+ * the requested range has already been reached or exceeded.
  */
 static inline int
 required_read_len(XLogDumpPrivate *private, XLogRecPtr targetPagePtr,
@@ -412,7 +412,7 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
        int                     count = required_read_len(private, targetPagePtr, reqLen);
        WALReadError errinfo;
 
-       /* Bail out if the count to be read is not valid */
+       /* Bail out if the end of the requested range has already been reached */
        if (count < 0)
                return -1;
 
@@ -440,6 +440,109 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
        return count;
 }
 
+/*
+ * pg_waldump's XLogReaderRoutine->segment_open callback to support dumping WAL
+ * files from tar archives.  Segment tracking is handled by
+ * TarWALDumpReadPage, so no action is needed here.
+ */
+static void
+TarWALDumpOpenSegment(XLogReaderState *state, XLogSegNo nextSegNo,
+                                         TimeLineID *tli_p)
+{
+       /* No action needed */
+}
+
+/*
+ * pg_waldump's XLogReaderRoutine->segment_close callback to support dumping
+ * WAL files from tar archives.  Segment tracking is handled by
+ * TarWALDumpReadPage, so no action is needed here.
+ */
+static void
+TarWALDumpCloseSegment(XLogReaderState *state)
+{
+       /* No action needed */
+}
+
+/*
+ * pg_waldump's XLogReaderRoutine->page_read callback to support dumping WAL
+ * files from tar archives.
+ */
+static int
+TarWALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
+                                  XLogRecPtr targetPtr, char *readBuff)
+{
+       XLogDumpPrivate *private = state->private_data;
+       int                     count = required_read_len(private, targetPagePtr, reqLen);
+       int                     segsize = state->segcxt.ws_segsize;
+       XLogSegNo       curSegNo;
+
+       /* Bail out if the end of the requested range has already been reached */
+       if (count < 0)
+               return -1;
+
+       /*
+        * If the target page is in a different segment, release the hash entry
+        * buffer and remove any spilled temporary file for the previous segment.
+        * Since pg_waldump never requests the same WAL bytes twice, moving to a
+        * new segment means the previous segment's data will not be needed again.
+        *
+        * Afterward, check whether the next required WAL segment was already
+        * spilled to the temporary directory before invoking the archive
+        * streamer.
+        */
+       curSegNo = state->seg.ws_segno;
+       if (!XLByteInSeg(targetPagePtr, curSegNo, segsize))
+       {
+               char            fname[MAXFNAMELEN];
+               XLogSegNo       nextSegNo;
+
+               /*
+                * Calculate the next WAL segment to be decoded from the given page
+                * pointer.
+                */
+               XLByteToSeg(targetPagePtr, nextSegNo, segsize);
+               state->seg.ws_tli = private->timeline;
+               state->seg.ws_segno = nextSegNo;
+
+               /* Close the WAL segment file if it is currently open */
+               if (state->seg.ws_file >= 0)
+               {
+                       close(state->seg.ws_file);
+                       state->seg.ws_file = -1;
+               }
+
+               /*
+                * If in pre-reading mode (prior to actual decoding), do not delete
+                * any entries that might be requested again once the decoding loop
+                * starts. For more details, see the comments in
+                * read_archive_wal_page().
+                */
+               if (private->decoding_started && curSegNo < nextSegNo)
+               {
+                       XLogFileName(fname, state->seg.ws_tli, curSegNo, segsize);
+                       free_archive_wal_entry(fname, private);
+               }
+
+               /*
+                * If the next segment exists in the temporary spill directory, open
+                * it and continue reading from there.
+                */
+               if (TmpWalSegDir != NULL)
+               {
+                       XLogFileName(fname, state->seg.ws_tli, nextSegNo, segsize);
+                       state->seg.ws_file = open_file_in_directory(TmpWalSegDir, fname);
+               }
+       }
+
+       /* Continue reading from the open WAL segment, if any */
+       if (state->seg.ws_file >= 0)
+               return WALDumpReadPage(state, targetPagePtr, count, targetPtr,
+                                                          readBuff);
+
+       /* Otherwise, read the WAL page from the archive streamer */
+       return read_archive_wal_page(private, targetPagePtr, count, readBuff);
+}
+
 /*
  * Boolean to return whether the given WAL record matches a specific relation
  * and optionally block.
@@ -777,8 +880,8 @@ usage(void)
        printf(_("  -F, --fork=FORK        only show records that modify blocks in fork FORK;\n"
                         "                         valid names are main, fsm, vm, init\n"));
        printf(_("  -n, --limit=N          number of records to display\n"));
-       printf(_("  -p, --path=PATH        directory in which to find WAL segment files or a\n"
-                        "                         directory with a ./pg_wal that contains such files\n"
+       printf(_("  -p, --path=PATH        a tar archive or a directory in which to find WAL segment files or\n"
+                        "                         a directory with a pg_wal subdirectory containing such files\n"
                         "                         (default: current directory, ./pg_wal, $PGDATA/pg_wal)\n"));
        printf(_("  -q, --quiet            do not print any output, except for errors\n"));
        printf(_("  -r, --rmgr=RMGR        only show records generated by resource manager RMGR;\n"
@@ -811,6 +914,7 @@ main(int argc, char **argv)
        XLogRecPtr      first_record;
        char       *waldir = NULL;
        char       *errormsg;
+       pg_compress_algorithm compression = PG_COMPRESSION_NONE;
 
        static struct option long_options[] = {
                {"bkp-details", no_argument, NULL, 'b'},
@@ -868,6 +972,10 @@ main(int argc, char **argv)
        private.startptr = InvalidXLogRecPtr;
        private.endptr = InvalidXLogRecPtr;
        private.endptr_reached = false;
+       private.decoding_started = false;
+       private.archive_name = NULL;
+       private.start_segno = 0;
+       private.end_segno = UINT64_MAX;
 
        config.quiet = false;
        config.bkp_details = false;
@@ -1109,8 +1217,13 @@ main(int argc, char **argv)
 
        if (waldir != NULL)
        {
-               /* validate path points to directory */
-               if (!verify_directory(waldir))
+               /* Check whether the path looks like a tar archive by its extension */
+               if (parse_tar_compress_algorithm(waldir, &compression))
+               {
+                       split_path(waldir, &private.archive_dir, &private.archive_name);
+               }
+               /* Otherwise it must be a directory */
+               else if (!verify_directory(waldir))
                {
                        pg_log_error("could not open directory \"%s\": %m", waldir);
                        goto bad_argument;
@@ -1128,6 +1241,17 @@ main(int argc, char **argv)
                int                     fd;
                XLogSegNo       segno;
 
+               /*
+                * If a tar archive is passed using the --path option, all other
+                * arguments become unnecessary.
+                */
+               if (private.archive_name)
+               {
+                       pg_log_error("unnecessary command-line arguments specified with tar archive (first is \"%s\")",
+                                                argv[optind]);
+                       goto bad_argument;
+               }
+
                split_path(argv[optind], &directory, &fname);
 
                if (waldir == NULL && directory != NULL)
@@ -1138,68 +1262,75 @@ main(int argc, char **argv)
                                pg_fatal("could not open directory \"%s\": %m", waldir);
                }
 
-               waldir = identify_target_directory(waldir, fname, &private.segsize);
-               fd = open_file_in_directory(waldir, fname);
-               if (fd < 0)
-                       pg_fatal("could not open file \"%s\"", fname);
-               close(fd);
-
-               /* parse position from file */
-               XLogFromFileName(fname, &private.timeline, &segno, private.segsize);
-
-               if (!XLogRecPtrIsValid(private.startptr))
-                       XLogSegNoOffsetToRecPtr(segno, 0, private.segsize, private.startptr);
-               else if (!XLByteInSeg(private.startptr, segno, private.segsize))
+               if (fname != NULL && parse_tar_compress_algorithm(fname, &compression))
                {
-                       pg_log_error("start WAL location %X/%08X is not inside file \"%s\"",
-                                                LSN_FORMAT_ARGS(private.startptr),
-                                                fname);
-                       goto bad_argument;
+                       private.archive_dir = waldir;
+                       private.archive_name = fname;
                }
-
-               /* no second file specified, set end position */
-               if (!(optind + 1 < argc) && !XLogRecPtrIsValid(private.endptr))
-                       XLogSegNoOffsetToRecPtr(segno + 1, 0, private.segsize, private.endptr);
-
-               /* parse ENDSEG if passed */
-               if (optind + 1 < argc)
+               else
                {
-                       XLogSegNo       endsegno;
-
-                       /* ignore directory, already have that */
-                       split_path(argv[optind + 1], &directory, &fname);
-
+                       waldir = identify_target_directory(waldir, fname, &private.segsize);
                        fd = open_file_in_directory(waldir, fname);
                        if (fd < 0)
                                pg_fatal("could not open file \"%s\"", fname);
                        close(fd);
 
                        /* parse position from file */
-                       XLogFromFileName(fname, &private.timeline, &endsegno, private.segsize);
+                       XLogFromFileName(fname, &private.timeline, &segno, private.segsize);
+
+                       if (!XLogRecPtrIsValid(private.startptr))
+                               XLogSegNoOffsetToRecPtr(segno, 0, private.segsize, private.startptr);
+                       else if (!XLByteInSeg(private.startptr, segno, private.segsize))
+                       {
+                               pg_log_error("start WAL location %X/%08X is not inside file \"%s\"",
+                                                        LSN_FORMAT_ARGS(private.startptr),
+                                                        fname);
+                               goto bad_argument;
+                       }
 
-                       if (endsegno < segno)
-                               pg_fatal("ENDSEG %s is before STARTSEG %s",
-                                                argv[optind + 1], argv[optind]);
+                       /* no second file specified, set end position */
+                       if (!(optind + 1 < argc) && !XLogRecPtrIsValid(private.endptr))
+                               XLogSegNoOffsetToRecPtr(segno + 1, 0, private.segsize, private.endptr);
 
-                       if (!XLogRecPtrIsValid(private.endptr))
-                               XLogSegNoOffsetToRecPtr(endsegno + 1, 0, private.segsize,
-                                                                               private.endptr);
+                       /* parse ENDSEG if passed */
+                       if (optind + 1 < argc)
+                       {
+                               XLogSegNo       endsegno;
 
-                       /* set segno to endsegno for check of --end */
-                       segno = endsegno;
-               }
+                               /* ignore directory, already have that */
+                               split_path(argv[optind + 1], &directory, &fname);
 
+                               fd = open_file_in_directory(waldir, fname);
+                               if (fd < 0)
+                                       pg_fatal("could not open file \"%s\"", fname);
+                               close(fd);
 
-               if (!XLByteInSeg(private.endptr, segno, private.segsize) &&
-                       private.endptr != (segno + 1) * private.segsize)
-               {
-                       pg_log_error("end WAL location %X/%08X is not inside file \"%s\"",
-                                                LSN_FORMAT_ARGS(private.endptr),
-                                                argv[argc - 1]);
-                       goto bad_argument;
+                               /* parse position from file */
+                               XLogFromFileName(fname, &private.timeline, &endsegno, private.segsize);
+
+                               if (endsegno < segno)
+                                       pg_fatal("ENDSEG %s is before STARTSEG %s",
+                                                        argv[optind + 1], argv[optind]);
+
+                               if (!XLogRecPtrIsValid(private.endptr))
+                                       XLogSegNoOffsetToRecPtr(endsegno + 1, 0, private.segsize,
+                                                                                       private.endptr);
+
+                               /* set segno to endsegno for check of --end */
+                               segno = endsegno;
+                       }
+
+                       if (!XLByteInSeg(private.endptr, segno, private.segsize) &&
+                               private.endptr != (segno + 1) * private.segsize)
+                       {
+                               pg_log_error("end WAL location %X/%08X is not inside file \"%s\"",
+                                                        LSN_FORMAT_ARGS(private.endptr),
+                                                        argv[argc - 1]);
+                               goto bad_argument;
+                       }
                }
        }
-       else
+       else if (!private.archive_name)
                waldir = identify_target_directory(waldir, NULL, &private.segsize);
 
        /* we don't know what to print */
@@ -1209,15 +1340,46 @@ main(int argc, char **argv)
                goto bad_argument;
        }
 
+       /* --follow is not supported with tar archives */
+       if (config.follow && private.archive_name)
+       {
+               pg_log_error("--follow is not supported when reading from a tar archive");
+               goto bad_argument;
+       }
+
        /* done with argument parsing, do the actual work */
 
        /* we have everything we need, start reading */
-       xlogreader_state =
-               XLogReaderAllocate(private.segsize, waldir,
-                                                  XL_ROUTINE(.page_read = WALDumpReadPage,
-                                                                         .segment_open = WALDumpOpenSegment,
-                                                                         .segment_close = WALDumpCloseSegment),
-                                                  &private);
+       if (private.archive_name)
+       {
+               /*
+                * A NULL directory indicates that the archive file is located in the
+                * current working directory.
+                */
+               if (private.archive_dir == NULL)
+                       private.archive_dir = pg_strdup(".");
+
+               /* Set up for reading tar file */
+               init_archive_reader(&private, compression);
+
+               /* Routine to decode WAL files in tar archive */
+               xlogreader_state =
+                       XLogReaderAllocate(private.segsize, private.archive_dir,
+                                                          XL_ROUTINE(.page_read = TarWALDumpReadPage,
+                                                                                 .segment_open = TarWALDumpOpenSegment,
+                                                                                 .segment_close = TarWALDumpCloseSegment),
+                                                          &private);
+       }
+       else
+       {
+               xlogreader_state =
+                       XLogReaderAllocate(private.segsize, waldir,
+                                                          XL_ROUTINE(.page_read = WALDumpReadPage,
+                                                                                 .segment_open = WALDumpOpenSegment,
+                                                                                 .segment_close = WALDumpCloseSegment),
+                                                          &private);
+       }
+
        if (!xlogreader_state)
                pg_fatal("out of memory while allocating a WAL reading processor");
 
@@ -1245,6 +1407,9 @@ main(int argc, char **argv)
        if (config.stats == true && !config.quiet)
                stats.startptr = first_record;
 
+       /* Flag indicating that the decoding loop has been entered */
+       private.decoding_started = true;
+
        for (;;)
        {
                if (time_to_stop)
@@ -1326,6 +1491,9 @@ main(int argc, char **argv)
 
        XLogReaderFree(xlogreader_state);
 
+       if (private.archive_name)
+               free_archive_reader(&private);
+
        return EXIT_SUCCESS;
 
 bad_argument:
index 013b051506f656976d2d01e2989efac7702ec8eb..36893624f53709ee6fb4b760e581d2cf350542ff 100644 (file)
 #define PG_WALDUMP_H
 
 #include "access/xlogdefs.h"
+#include "fe_utils/astreamer.h"
+
+/* Forward declaration */
+struct ArchivedWALFile;
+struct ArchivedWAL_hash;
+
+/* Temporary directory for spilling out-of-order WAL segments from archives */
+extern char *TmpWalSegDir;
 
 /* Contains the necessary information to drive WAL decoding */
 typedef struct XLogDumpPrivate
@@ -21,6 +29,49 @@ typedef struct XLogDumpPrivate
        XLogRecPtr      startptr;
        XLogRecPtr      endptr;
        bool            endptr_reached;
+       bool            decoding_started;
+
+       /* Fields required to read WAL from archive */
+       char       *archive_dir;
+       char       *archive_name;       /* Tar archive filename */
+       int                     archive_fd;             /* File descriptor for the open tar file */
+
+       astreamer  *archive_streamer;
+       char       *archive_read_buf;   /* Reusable read buffer for archive I/O */
+
+#ifdef USE_ASSERT_CHECKING
+       Size            archive_read_buf_size;
+#endif
+
+       /* What the archive streamer is currently reading */
+       struct ArchivedWALFile *cur_file;
+
+       /*
+        * Hash table of WAL segments currently buffered from the archive,
+        * including any segment currently being streamed.  Entries are removed
+        * once consumed, so this does not accumulate all segments ever read.
+        */
+       struct ArchivedWAL_hash *archive_wal_htab;
+
+       /*
+        * Pre-computed segment numbers derived from startptr and endptr. Caching
+        * them avoids repeated XLByteToSeg() calls when filtering each archive
+        * member against the requested WAL range.  end_segno is initialized to
+        * UINT64_MAX when no end limit is requested.
+        */
+       XLogSegNo       start_segno;
+       XLogSegNo       end_segno;
 } XLogDumpPrivate;
 
+extern int     open_file_in_directory(const char *directory, const char *fname);
+
+extern void init_archive_reader(XLogDumpPrivate *privateInfo,
+                                                               pg_compress_algorithm compression);
+extern void free_archive_reader(XLogDumpPrivate *privateInfo);
+extern int     read_archive_wal_page(XLogDumpPrivate *privateInfo,
+                                                                 XLogRecPtr targetPagePtr,
+                                                                 Size count, char *readBuff);
+extern void free_archive_wal_entry(const char *fname,
+                                                                  XLogDumpPrivate *privateInfo);
+
 #endif                                                 /* PG_WALDUMP_H */
index 5db5d20136f30943807973b9287cc3e4cb188734..11df7e092bf09a80ab27ca5ca8dbf73a6c5405cb 100644 (file)
@@ -3,9 +3,13 @@
 
 use strict;
 use warnings FATAL => 'all';
+use Cwd;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use List::Util qw(shuffle);
+
+my $tar = $ENV{TAR};
 
 program_help_ok('pg_waldump');
 program_version_ok('pg_waldump');
@@ -162,6 +166,42 @@ CREATE TABLESPACE ts1 LOCATION '$tblspc_path';
 DROP TABLESPACE ts1;
 });
 
+# Test: Decode a continuation record (contrecord) that spans multiple WAL
+# segments.
+#
+# Now consume all remaining room in the current WAL segment, leaving
+# space enough only for the start of a largish record.
+$node->safe_psql(
+       'postgres', q{
+DO $$
+DECLARE
+    wal_segsize int := setting::int FROM pg_settings WHERE name = 'wal_segment_size';
+    remain int;
+    iters  int := 0;
+BEGIN
+    LOOP
+        INSERT into t1(b)
+        select repeat(encode(sha256(g::text::bytea), 'hex'), (random() * 15 + 1)::int)
+        from generate_series(1, 10) g;
+
+        remain := wal_segsize - (pg_current_wal_insert_lsn() - '0/0') % wal_segsize;
+        IF remain < 2 * setting::int from pg_settings where name = 'block_size' THEN
+            RAISE log 'exiting after % iterations, % bytes to end of WAL segment', iters, remain;
+            EXIT;
+        END IF;
+        iters := iters + 1;
+    END LOOP;
+END
+$$;
+});
+
+my $contrecord_lsn = $node->safe_psql('postgres',
+       'SELECT pg_current_wal_insert_lsn()');
+# Generate contrecord record
+$node->safe_psql('postgres',
+       qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+);
+
 my ($end_lsn, $end_walfile) = split /\|/,
   $node->safe_psql('postgres',
        q{SELECT pg_current_wal_insert_lsn(), pg_walfile_name(pg_current_wal_insert_lsn())}
@@ -198,51 +238,23 @@ command_like(
        ],
        qr/./,
        'runs with start and end segment specified');
-command_fails_like(
-       [ 'pg_waldump', '--path' => $node->data_dir ],
-       qr/error: no start WAL location given/,
-       'path option requires start location');
-command_like(
-       [
-               'pg_waldump',
-               '--path' => $node->data_dir,
-               '--start' => $start_lsn,
-               '--end' => $end_lsn,
-       ],
-       qr/./,
-       'runs with path option and start and end locations');
-command_fails_like(
-       [
-               'pg_waldump',
-               '--path' => $node->data_dir,
-               '--start' => $start_lsn,
-       ],
-       qr/error: error in WAL record at/,
-       'falling off the end of the WAL results in an error');
-
 command_like(
        [
-               'pg_waldump', '--quiet',
-               $node->data_dir . '/pg_wal/' . $start_walfile
+               'pg_waldump', '--quiet', '--path',
+               $node->data_dir . '/pg_wal/', $start_walfile
        ],
        qr/^$/,
        'no output with --quiet option');
-command_fails_like(
-       [
-               'pg_waldump', '--quiet',
-               '--path' => $node->data_dir,
-               '--start' => $start_lsn
-       ],
-       qr/error: error in WAL record at/,
-       'errors are shown with --quiet');
-
 
 # Test for: Display a message that we're skipping data if `from`
 # wasn't a pointer to the start of a record.
+sub test_pg_waldump_skip_bytes
 {
+       my ($path, $startlsn, $endlsn) = @_;
+
        # Construct a new LSN that is one byte past the original
        # start_lsn.
-       my ($part1, $part2) = split qr{/}, $start_lsn;
+       my ($part1, $part2) = split qr{/}, $startlsn;
        my $lsn2 = hex $part2;
        $lsn2++;
        my $new_start = sprintf("%s/%X", $part1, $lsn2);
@@ -252,7 +264,8 @@ command_fails_like(
        my $result = IPC::Run::run [
                'pg_waldump',
                '--start' => $new_start,
-               $node->data_dir . '/pg_wal/' . $start_walfile
+               '--end' => $endlsn,
+               '--path' => $path,
          ],
          '>' => \$stdout,
          '2>' => \$stderr;
@@ -266,15 +279,15 @@ command_fails_like(
 sub test_pg_waldump
 {
        local $Test::Builder::Level = $Test::Builder::Level + 1;
-       my @opts = @_;
+       my ($path, $startlsn, $endlsn, @opts) = @_;
 
        my ($stdout, $stderr);
 
        my $result = IPC::Run::run [
                'pg_waldump',
-               '--path' => $node->data_dir,
-               '--start' => $start_lsn,
-               '--end' => $end_lsn,
+               '--start' => $startlsn,
+               '--end' => $endlsn,
+               '--path' => $path,
                @opts
          ],
          '>' => \$stdout,
@@ -286,40 +299,145 @@ sub test_pg_waldump
        return @lines;
 }
 
-my @lines;
-
-@lines = test_pg_waldump;
-is(grep(!/^rmgr: \w/, @lines), 0, 'all output lines are rmgr lines');
-
-@lines = test_pg_waldump('--limit' => 6);
-is(@lines, 6, 'limit option observed');
-
-@lines = test_pg_waldump('--fullpage');
-is(grep(!/^rmgr:.*\bFPW\b/, @lines), 0, 'all output lines are FPW');
-
-@lines = test_pg_waldump('--stats');
-like($lines[0], qr/WAL statistics/, "statistics on stdout");
-is(grep(/^rmgr:/, @lines), 0, 'no rmgr lines output');
-
-@lines = test_pg_waldump('--stats=record');
-like($lines[0], qr/WAL statistics/, "statistics on stdout");
-is(grep(/^rmgr:/, @lines), 0, 'no rmgr lines output');
-
-@lines = test_pg_waldump('--rmgr' => 'Btree');
-is(grep(!/^rmgr: Btree/, @lines), 0, 'only Btree lines');
-
-@lines = test_pg_waldump('--fork' => 'init');
-is(grep(!/fork init/, @lines), 0, 'only init fork lines');
-
-@lines = test_pg_waldump(
-       '--relation' => "$default_ts_oid/$postgres_db_oid/$rel_t1_oid");
-is(grep(!/rel $default_ts_oid\/$postgres_db_oid\/$rel_t1_oid/, @lines),
-       0, 'only lines for selected relation');
-
-@lines = test_pg_waldump(
-       '--relation' => "$default_ts_oid/$postgres_db_oid/$rel_i1a_oid",
-       '--block' => 1);
-is(grep(!/\bblk 1\b/, @lines), 0, 'only lines for selected block');
+# Create a tar archive, shuffle the file order
+sub generate_archive
+{
+       my ($archive, $directory, $compression_flags) = @_;
+
+       my @files;
+       opendir my $dh, $directory or die "opendir: $!";
+       while (my $entry = readdir $dh) {
+               # Skip '.' and '..'
+               next if $entry eq '.' || $entry eq '..';
+               push @files, $entry;
+       }
+       closedir $dh;
+
+       @files = shuffle @files;
+
+       # move into the WAL directory before archiving files
+       my $cwd = getcwd;
+       chdir($directory) || die "chdir: $!";
+       command_ok([$tar, $compression_flags, $archive, @files]);
+       chdir($cwd) || die "chdir: $!";
+}
 
+my $tmp_dir = PostgreSQL::Test::Utils::tempdir_short();
+
+my @scenarios = (
+       {
+               'path' => $node->data_dir,
+               'is_archive' => 0,
+               'enabled' => 1
+       },
+       {
+               'path' => "$tmp_dir/pg_wal.tar",
+               'compression_method' => 'none',
+               'compression_flags' => '-cf',
+               'is_archive' => 1,
+               'enabled' => 1
+       },
+       {
+               'path' => "$tmp_dir/pg_wal.tar.gz",
+               'compression_method' => 'gzip',
+               'compression_flags' => '-czf',
+               'is_archive' => 1,
+               'enabled' => check_pg_config("#define HAVE_LIBZ 1")
+       });
+
+for my $scenario (@scenarios)
+{
+       my $path = $scenario->{'path'};
+
+       SKIP:
+       {
+               skip "tar command is not available", 56
+                 if !defined $tar && $scenario->{'is_archive'};
+               skip "$scenario->{'compression_method'} compression not supported by this build", 56
+                 if !$scenario->{'enabled'} && $scenario->{'is_archive'};
+
+                 # create pg_wal archive
+                 if ($scenario->{'is_archive'})
+                 {
+                         generate_archive($path,
+                                 $node->data_dir . '/pg_wal',
+                                 $scenario->{'compression_flags'});
+                 }
+
+               command_fails_like(
+                       [ 'pg_waldump', '--path' => $path ],
+                       qr/error: no start WAL location given/,
+                       'path option requires start location');
+               command_like(
+                       [
+                               'pg_waldump',
+                               '--path' => $path,
+                               '--start' => $start_lsn,
+                               '--end' => $end_lsn,
+                       ],
+                       qr/./,
+                       'runs with path option and start and end locations');
+               command_fails_like(
+                       [
+                               'pg_waldump',
+                               '--path' => $path,
+                               '--start' => $start_lsn,
+                       ],
+                       qr/error: error in WAL record at/,
+                       'falling off the end of the WAL results in an error');
+
+               command_fails_like(
+                       [
+                               'pg_waldump', '--quiet',
+                               '--path' => $path,
+                               '--start' => $start_lsn
+                       ],
+                       qr/error: error in WAL record at/,
+                       'errors are shown with --quiet');
+
+               test_pg_waldump_skip_bytes($path, $start_lsn, $end_lsn);
+
+               my @lines = test_pg_waldump($path, $start_lsn, $end_lsn);
+               is(grep(!/^rmgr: \w/, @lines), 0, 'all output lines are rmgr lines');
+
+               @lines = test_pg_waldump($path, $contrecord_lsn, $end_lsn);
+               is(grep(!/^rmgr: \w/, @lines), 0, 'all output lines are rmgr lines');
+
+               test_pg_waldump_skip_bytes($path, $contrecord_lsn, $end_lsn);
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn, '--limit' => 6);
+               is(@lines, 6, 'limit option observed');
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn, '--fullpage');
+               is(grep(!/^rmgr:.*\bFPW\b/, @lines), 0, 'all output lines are FPW');
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn, '--stats');
+               like($lines[0], qr/WAL statistics/, "statistics on stdout");
+               is(grep(/^rmgr:/, @lines), 0, 'no rmgr lines output');
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn, '--stats=record');
+               like($lines[0], qr/WAL statistics/, "statistics on stdout");
+               is(grep(/^rmgr:/, @lines), 0, 'no rmgr lines output');
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn, '--rmgr' => 'Btree');
+               is(grep(!/^rmgr: Btree/, @lines), 0, 'only Btree lines');
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn, '--fork' => 'init');
+               is(grep(!/fork init/, @lines), 0, 'only init fork lines');
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn,
+                       '--relation' => "$default_ts_oid/$postgres_db_oid/$rel_t1_oid");
+               is(grep(!/rel $default_ts_oid\/$postgres_db_oid\/$rel_t1_oid/, @lines),
+                       0, 'only lines for selected relation');
+
+               @lines = test_pg_waldump($path, $start_lsn, $end_lsn,
+                       '--relation' => "$default_ts_oid/$postgres_db_oid/$rel_i1a_oid",
+                       '--block' => 1);
+               is(grep(!/\bblk 1\b/, @lines), 0, 'only lines for selected block');
+
+               # Cleanup.
+               unlink $path if $scenario->{'is_archive'};
+       }
+}
 
 done_testing();
index e1565329f1cc49fb18496bda1ab0d245cd0b4b9a..0042c33fa662f25a7d407b8aac782e5b9fd832b1 100644 (file)
@@ -147,6 +147,9 @@ ArchiveOpts
 ArchiveShutdownCB
 ArchiveStartupCB
 ArchiveStreamState
+ArchivedWALFile
+ArchivedWAL_hash
+ArchivedWAL_iterator
 ArchiverOutput
 ArchiverStage
 ArrayAnalyzeExtraData
@@ -3545,6 +3548,7 @@ astreamer_recovery_injector
 astreamer_tar_archiver
 astreamer_tar_parser
 astreamer_verify
+astreamer_waldump
 astreamer_zstd_frame
 auth_password_hook_typ
 autovac_table