static ArchivedWALFile *get_archive_wal_entry(const char *fname,
XLogDumpPrivate *privateInfo);
-static int read_archive_file(XLogDumpPrivate *privateInfo, Size count);
+static bool read_archive_file(XLogDumpPrivate *privateInfo, Size count);
static void setup_tmpwal_dir(const char *waldir);
static void cleanup_tmpwal_dir_atexit(void);
astreamer *streamer;
ArchivedWALFile *entry = NULL;
XLogLongPageHeader longhdr;
- XLogSegNo segno;
- TimeLineID timeline;
+ ArchivedWAL_iterator iter;
/* Open tar archive and store its file descriptor */
fd = open_file_in_directory(privateInfo->archive_dir,
pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
privateInfo->archive_fd = fd;
+ privateInfo->archive_fd_eof = false;
streamer = astreamer_waldump_new(privateInfo);
privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
/*
- * Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from
- * the first WAL segment in the archive so we can extract the WAL segment
- * size from the long page header.
+ * Read until we have at least one WAL segment with enough data to extract
+ * the WAL segment size from the long page header.
+ *
+ * We must not rely on cur_file here, because it can become NULL if a
+ * member trailer is processed during a read_archive_file() call. Instead,
+ * scan the hash table after each read to find any entry with sufficient
+ * data.
*/
- while (entry == NULL || entry->buf->len < XLOG_BLCKSZ)
+ while (entry == NULL)
{
- if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0)
+ if (!read_archive_file(privateInfo, XLOG_BLCKSZ))
pg_fatal("could not find WAL in archive \"%s\"",
privateInfo->archive_name);
- entry = privateInfo->cur_file;
+ ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
+ while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
+ &iter)) != NULL)
+ {
+ if (entry->read_len >= sizeof(XLogLongPageHeaderData))
+ break;
+ }
}
/* Extract the WAL segment size from the long page header */
privateInfo->segsize);
/*
- * This WAL record was fetched before the filtering parameters
- * (start_segno and end_segno) were fully initialized. Perform the
- * relevance check against the user-provided range now; if the WAL falls
- * outside this range, remove it from the hash table. Subsequent WAL will
- * be filtered automatically by the archive streamer using the updated
- * start_segno and end_segno values.
+ * Now that we have initialized the filtering parameters (start_segno and
+ * end_segno), we can discard any already-loaded WAL hash table entries
+ * for segments we don't actually need. Subsequent WAL will be filtered
+ * automatically by the archive streamer using the updated start_segno and
+ * end_segno values.
*/
- XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
- if (privateInfo->timeline != timeline ||
- privateInfo->start_segno > segno ||
- privateInfo->end_segno < segno)
- free_archive_wal_entry(entry->fname, privateInfo);
+ ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
+ while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
+ &iter)) != NULL)
+ {
+ XLogSegNo segno;
+ TimeLineID timeline;
+
+ XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
+ if (privateInfo->timeline != timeline ||
+ privateInfo->start_segno > segno ||
+ privateInfo->end_segno < segno)
+ free_archive_wal_entry(entry->fname, privateInfo);
+ }
}
/*
/*
* NB: Normally, astreamer_finalize() is called before astreamer_free() to
* flush any remaining buffered data or to ensure the end of the tar
- * archive is reached. However, when decoding WAL, once we hit the end
- * LSN, any remaining buffered data or unread portion of the archive can
- * be safely ignored.
+ * archive is reached. read_archive_file() may have done so. However,
+ * when decoding WAL we can stop once we hit the end LSN, so we may never
+ * have read all of the input file. In that case any remaining buffered
+ * data or unread portion of the archive can be safely ignored.
*/
astreamer_free(privateInfo->archive_streamer);
fname, privateInfo->archive_name,
(long long int) (count - nbytes),
(long long int) count);
- if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
+ if (!read_archive_file(privateInfo, READ_CHUNK_SIZE))
pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes",
privateInfo->archive_name, fname,
(long long int) (count - nbytes),
/*
* Returns the archived WAL entry from the hash table if it already exists.
* Otherwise, reads more data from the archive until the requested entry is
- * found. If the archive streamer is reading a WAL file from the archive that
+ * found. If the archive streamer reads a WAL file from the archive that
* is not currently needed, that data is spilled to a temporary file for later
* retrieval.
+ *
+ * Note that the returned entry might not have been completely read from
+ * the archive yet.
*/
static ArchivedWALFile *
get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
{
- ArchivedWALFile *entry = NULL;
- FILE *write_fp = NULL;
-
- /*
- * Search the hash table first. If the entry is found, return it.
- * Otherwise, the requested WAL entry hasn't been read from the archive
- * yet; invoke the archive streamer to fetch it.
- */
while (1)
{
+ ArchivedWALFile *entry;
+ ArchivedWAL_iterator iter;
+
/*
- * Search hash table.
- *
- * We perform the search inside the loop because a single iteration of
- * the archive reader may decompress and extract multiple files into
- * the hash table. One of these newly added files could be the one we
- * are seeking.
+ * Search the hash table first. If the entry is found, return it.
+ * Otherwise, the requested WAL entry hasn't been read from the
+ * archive yet; we must invoke the archive streamer to fetch it.
*/
entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
return entry;
/*
- * Capture the current entry before calling read_archive_file(),
- * because cur_file may advance to a new segment during streaming. We
- * hold this reference so we can flush any remaining buffer data and
- * close the write handle once we detect that cur_file has moved on.
- */
- entry = privateInfo->cur_file;
-
- /*
- * Fetch more data either when no current file is being tracked or
- * when its buffer has been fully flushed to the temporary file.
+ * Before loading more data, scan the hash table to see if we have
+ * loaded any files we don't need yet. If so, spill their data to
+ * disk to conserve memory space. But don't try to spill a
+ * partially-read file; it's not worth the complication.
*/
- if (entry == NULL || entry->buf->len == 0)
+ ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
+ while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
+ &iter)) != NULL)
{
- if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
- break; /* archive file ended */
- }
+ FILE *write_fp;
- /*
- * Archive streamer is reading a non-WAL file or an irrelevant WAL
- * file.
- */
- if (entry == NULL)
- continue;
-
- /*
- * The streamer is producing a WAL segment that isn't the one asked
- * for; it must be arriving out of order. Spill its data to disk so
- * it can be read back when needed.
- */
- Assert(strcmp(fname, entry->fname) != 0);
+ /* OK to spill? */
+ if (entry->spilled)
+ continue; /* already spilled */
+ if (entry == privateInfo->cur_file)
+ continue; /* still being read */
- /* Create a temporary file if one does not already exist */
- if (!entry->spilled)
- {
+ /* Write out the completed WAL file contents to a temp file. */
write_fp = prepare_tmp_write(entry->fname, privateInfo);
+ perform_tmp_write(entry->fname, entry->buf, write_fp);
+ fclose(write_fp);
+
+ /* resetStringInfo won't release storage, so delete/recreate. */
+ destroyStringInfo(entry->buf);
+ entry->buf = makeStringInfo();
entry->spilled = true;
}
- /* Flush data from the buffer to the file */
- perform_tmp_write(entry->fname, entry->buf, write_fp);
- resetStringInfo(entry->buf);
-
/*
- * If cur_file changed since we captured entry above, the archive
- * streamer has finished this segment and moved on. Close its spill
- * file handle so data is flushed to disk before the next segment
- * starts writing to a different handle.
+ * Read more data. If we reach EOF, the desired file is not present.
*/
- if (entry != privateInfo->cur_file && write_fp != NULL)
- {
- fclose(write_fp);
- write_fp = NULL;
- }
+ if (!read_archive_file(privateInfo, READ_CHUNK_SIZE))
+ pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
+ fname, privateInfo->archive_name);
}
-
- /* Requested WAL segment not found */
- pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
- fname, privateInfo->archive_name);
}
/*
* Reads a chunk from the archive file and passes it through the streamer
* pipeline for decompression (if needed) and tar member extraction.
+ *
+ * count is the maximum amount to try to read this time. Note that it's
+ * measured in raw file bytes, and may have little to do with how much
+ * comes out of decompression/extraction.
+ *
+ * Returns true if successful, false if there is no more data.
+ *
+ * Callers must be aware that a single call may trigger multiple callbacks
+ * in astreamer_waldump_content, so privateInfo->cur_file can change value
+ * (or become NULL) during a call. In particular, cur_file is set to NULL
+ * when the ASTREAMER_MEMBER_TRAILER callback fires at the end of a tar
+ * member; it is then set to a new entry when the next WAL member's
+ * ASTREAMER_MEMBER_HEADER callback fires, which may or may not happen
+ * within the same call.
*/
-static int
+static bool
read_archive_file(XLogDumpPrivate *privateInfo, Size count)
{
int rc;
/* The read request must not exceed the allocated buffer size. */
Assert(privateInfo->archive_read_buf_size >= count);
+ /* Fail if we already reached EOF in a prior call. */
+ if (privateInfo->archive_fd_eof)
+ return false;
+
+ /* Try to read some more data. */
rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count);
if (rc < 0)
pg_fatal("could not read file \"%s\": %m",
astreamer_content(privateInfo->archive_streamer, NULL,
privateInfo->archive_read_buf, rc,
ASTREAMER_UNKNOWN);
+ else
+ {
+ /*
+ * We reached EOF, but there is probably still data queued in the
+ * astreamer pipeline's buffers. Flush it out to ensure that we
+ * process everything.
+ */
+ astreamer_finalize(privateInfo->archive_streamer);
+ /* Set flag to ensure we don't finalize more than once. */
+ privateInfo->archive_fd_eof = true;
+ }
- return rc;
+ return true;
}
/*