#include <limits.h> /* INT_MAX */
#include <signal.h>
#include "timefn.h" /* UTIL_getTime, UTIL_clockSpanMicro */
+#include "../lib/common/pool.h"
+#include "../lib/common/threading.h"
#if defined (_MSC_VER)
# include <sys/stat.h>
/* IO preferences */
U32 removeSrcFile;
U32 overwrite;
+ U32 asyncIO;
/* Computation resources preferences */
unsigned memLimit;
ret->literalCompressionMode = ZSTD_ps_auto;
ret->excludeCompressedFiles = 0;
ret->allowBlockDevices = 0;
+ ret->asyncIO = 0;
return ret;
}
prefs->contentSize = value != 0;
}
+void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) {
+ prefs->asyncIO = value;
+}
+
/* FIO_ctx_t functions */
void FIO_setHasStdoutOutput(FIO_ctx_t* const fCtx, int value) {
static const char* checked_index(const char* options[], size_t length, size_t index) {
assert(index < length);
- // Necessary to avoid warnings since -O3 will omit the above `assert`
+ /* Necessary to avoid warnings since -O3 will omit the above `assert` */
(void) length;
return options[index];
}
/* **************************************************************************
* Decompression
***************************************************************************/
+#define DECOMPRESSION_MAX_WRITE_JOBS (10)
+
+typedef struct {
+ /* These struct fields should be set only on creation and not changed afterwards */
+ POOL_ctx* writerPool;
+ int totalWriteJobs;
+ FIO_prefs_t* prefs;
+
+ /* Controls the file we currently write to, make changes only by using provided utility functions */
+ FILE* dstFile;
+ unsigned storedSkips;
+
+ /* The jobs and availableWriteJobs fields are access by both the main and writer threads and should
+ * only be mutated after locking the mutex */
+ ZSTD_pthread_mutex_t writeJobsMutex;
+ void* jobs[DECOMPRESSION_MAX_WRITE_JOBS];
+ int availableWriteJobs;
+} write_pool_ctx_t;
+
+typedef struct {
+ /* These fields are automaically set and shouldn't be changed by non WritePool code. */
+ write_pool_ctx_t *ctx;
+ FILE* dstFile;
+ void *buffer;
+ size_t bufferSize;
+
+ /* This field should be changed before a job is queued for execution and should contain the number
+ * of bytes to write from the buffer. */
+ size_t usedBufferSize;
+} write_job_t;
+
typedef struct {
void* srcBuffer;
size_t srcBufferSize;
size_t srcBufferLoaded;
- void* dstBuffer;
- size_t dstBufferSize;
ZSTD_DStream* dctx;
- FILE* dstFile;
+ write_pool_ctx_t *writePoolCtx;
} dRess_t;
+static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) {
+ void *buffer;
+ write_job_t *job;
+ job = (write_job_t*) malloc(sizeof(write_job_t));
+ buffer = malloc(ZSTD_DStreamOutSize());
+ if(!job || !buffer)
+ EXM_THROW(101, "Allocation error : not enough memory");
+ job->buffer = buffer;
+ job->bufferSize = ZSTD_DStreamOutSize();
+ job->usedBufferSize = 0;
+ job->dstFile = NULL;
+ job->ctx = ctx;
+ return job;
+}
+
+/* WritePool_createThreadPool:
+ * Creates a thread pool and a mutex for threaded write pool.
+ * Displays warning if asyncio is requested but MT isn't available. */
+static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) {
+ ctx->writerPool = NULL;
+ if(prefs->asyncIO) {
+#ifdef ZSTD_MULTITHREAD
+ if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL))
+ EXM_THROW(102, "Failed creating write jobs mutex");
+ /* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to
+ * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
+ assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2);
+ ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2);
+ if (!ctx->writerPool)
+ EXM_THROW(103, "Failed creating writer thread pool");
+#else
+ DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n");
+#endif
+ }
+}
+
+/* WritePool_create:
+ * Allocates and sets and a new write pool including its included jobs. */
+static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) {
+ write_pool_ctx_t *ctx;
+ int i;
+ ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t));
+ if(!ctx)
+ EXM_THROW(100, "Allocation error : not enough memory");
+ WritePool_createThreadPool(ctx, prefs);
+ ctx->prefs = prefs;
+ ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1;
+ ctx->availableWriteJobs = ctx->totalWriteJobs;
+ for(i=0; i < ctx->availableWriteJobs; i++) {
+ ctx->jobs[i] = FIO_createWriteJob(ctx);
+ }
+ ctx->storedSkips = 0;
+ ctx->dstFile = NULL;
+ return ctx;
+}
+
+/* WritePool_free:
+ * Release a previously allocated write thread pool. Makes sure all takss are done and released. */
+static void WritePool_free(write_pool_ctx_t* ctx) {
+ int i=0;
+ if(ctx->writerPool) {
+ /* Make sure we finish all tasks and then free the resources */
+ POOL_joinJobs(ctx->writerPool);
+ /* Make sure we are not leaking jobs */
+ assert(ctx->availableWriteJobs==ctx->totalWriteJobs);
+ POOL_free(ctx->writerPool);
+ ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex);
+ }
+ assert(ctx->dstFile==NULL);
+ assert(ctx->storedSkips==0);
+ for(i=0; i<ctx->availableWriteJobs; i++) {
+ write_job_t* job = (write_job_t*) ctx->jobs[i];
+ free(job->buffer);
+ free(job);
+ }
+ free(ctx);
+}
+
+
static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
{
dRess_t ress;
ress.srcBufferSize = ZSTD_DStreamInSize();
ress.srcBuffer = malloc(ress.srcBufferSize);
- ress.dstBufferSize = ZSTD_DStreamOutSize();
- ress.dstBuffer = malloc(ress.dstBufferSize);
- if (!ress.srcBuffer || !ress.dstBuffer)
+ if (!ress.srcBuffer)
EXM_THROW(61, "Allocation error : not enough memory");
/* dictionary */
free(dictBuffer);
}
+ ress.writePoolCtx = WritePool_create(prefs);
+
return ress;
}
{
CHECK( ZSTD_freeDStream(ress.dctx) );
free(ress.srcBuffer);
- free(ress.dstBuffer);
+ WritePool_free(ress.writePoolCtx);
}
+/* FIO_consumeDSrcBuffer:
+ * Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
+static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
+ assert(ress->srcBufferLoaded >= len);
+ ress->srcBufferLoaded -= len;
+ memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
+}
/** FIO_fwriteSparse() :
* @return : storedSkips,
} }
}
+/* WritePool_releaseWriteJob:
+ * Releases an acquired job back to the pool. Doesn't execute the job. */
+static void WritePool_releaseWriteJob(write_job_t *job) {
+ write_pool_ctx_t *ctx = job->ctx;
+ if(ctx->writerPool) {
+ ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
+ assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS);
+ ctx->jobs[ctx->availableWriteJobs++] = job;
+ ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
+ } else {
+ ctx->availableWriteJobs++;
+ }
+}
+
+/* WritePool_acquireWriteJob:
+ * Returns an available write job to be used for a future write. */
+static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) {
+ write_job_t *job;
+ assert(ctx->dstFile!=NULL || ctx->prefs->testMode);
+ if(ctx->writerPool) {
+ ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
+ assert(ctx->availableWriteJobs > 0);
+ job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs];
+ ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
+ } else {
+ assert(ctx->availableWriteJobs==1);
+ ctx->availableWriteJobs--;
+ job = (write_job_t*)ctx->jobs[0];
+ }
+ job->usedBufferSize = 0;
+ job->dstFile = ctx->dstFile;
+ return job;
+}
+
+/* WritePool_executeWriteJob:
+ * Executes a write job synchronously. Can be used as a function for a thread pool. */
+static void WritePool_executeWriteJob(void* opaque){
+ write_job_t* job = (write_job_t*) opaque;
+ write_pool_ctx_t* ctx = job->ctx;
+ ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips);
+ WritePool_releaseWriteJob(job);
+}
+
+/* WritePool_queueWriteJob:
+ * Queues a write job for execution.
+ * Make sure to set `usedBufferSize` to the wanted length before call.
+ * The queued job shouldn't be used directly after queueing it. */
+static void WritePool_queueWriteJob(write_job_t *job) {
+ write_pool_ctx_t* ctx = job->ctx;
+ if(ctx->writerPool)
+ POOL_add(ctx->writerPool, WritePool_executeWriteJob, job);
+ else
+ WritePool_executeWriteJob(job);
+}
+
+/* WritePool_queueAndReacquireWriteJob:
+ * Queues a write job for execution and acquires a new one.
+ * After execution `job`'s pointed value would change to the newly acquired job.
+ * Make sure to set `usedBufferSize` to the wanted length before call.
+ * The queued job shouldn't be used directly after queueing it. */
+static void WritePool_queueAndReacquireWriteJob(write_job_t **job) {
+ WritePool_queueWriteJob(*job);
+ *job = WritePool_acquireWriteJob((*job)->ctx);
+}
+
+/* WritePool_sparseWriteEnd:
+ * Ends sparse writes to the current dstFile.
+ * Blocks on completion of all current write jobs before executing. */
+static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) {
+ assert(ctx != NULL);
+ if(ctx->writerPool)
+ POOL_joinJobs(ctx->writerPool);
+ FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips);
+ ctx->storedSkips = 0;
+}
+
+/* WritePool_setDstFile:
+ * Sets the destination file for future files in the pool.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
+ * Also requires ending of sparse write if a previous file was used in sparse mode. */
+static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) {
+ assert(ctx!=NULL);
+ /* We can change the dst file only if we have finished writing */
+ if(ctx->writerPool)
+ POOL_joinJobs(ctx->writerPool);
+ assert(ctx->storedSkips == 0);
+ assert(ctx->availableWriteJobs == ctx->totalWriteJobs);
+ ctx->dstFile = dstFile;
+}
+
+/* WritePool_closeDstFile:
+ * Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */
+static int WritePool_closeDstFile(write_pool_ctx_t *ctx) {
+ FILE *dstFile = ctx->dstFile;
+ assert(dstFile!=NULL || ctx->prefs->testMode!=0);
+ WritePool_sparseWriteEnd(ctx);
+ WritePool_setDstFile(ctx, NULL);
+ return fclose(dstFile);
+}
/** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
@return : 0 (no error) */
U64 alreadyDecoded) /* for multi-frames streams */
{
U64 frameSize = 0;
- U32 storedSkips = 0;
+ write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
/* display last 20 characters only */
{ size_t const srcFileLength = strlen(srcFileName);
/* Main decompression Loop */
while (1) {
ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 };
- ZSTD_outBuffer outBuff= { ress->dstBuffer, ress->dstBufferSize, 0 };
+ ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
UTIL_HumanReadableSize_t const hrs = UTIL_makeHumanReadableSize(alreadyDecoded+frameSize);
}
/* Write block */
- storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, prefs, storedSkips);
+ writeJob->usedBufferSize = outBuff.pos;
+ WritePool_queueAndReacquireWriteJob(&writeJob);
frameSize += outBuff.pos;
if (fCtx->nbFilesTotal > 1) {
size_t srcFileNameSize = strlen(srcFileName);
srcFileName, hrs.precision, hrs.value, hrs.suffix);
}
- if (inBuff.pos > 0) {
- memmove(ress->srcBuffer, (char*)ress->srcBuffer + inBuff.pos, inBuff.size - inBuff.pos);
- ress->srcBufferLoaded -= inBuff.pos;
- }
+ FIO_consumeDSrcBuffer(ress, inBuff.pos);
if (readSizeHint == 0) break; /* end of frame */
ress->srcBufferLoaded += readSize;
} } }
- FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+ WritePool_releaseWriteJob(writeJob);
+ WritePool_sparseWriteEnd(ress->writePoolCtx);
return frameSize;
}
#ifdef ZSTD_GZDECOMPRESS
static unsigned long long
-FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile,
- const FIO_prefs_t* const prefs,
- const char* srcFileName)
+FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
{
unsigned long long outFileSize = 0;
z_stream strm;
int flush = Z_NO_FLUSH;
int decodingError = 0;
- unsigned storedSkips = 0;
+ write_job_t *writeJob = NULL;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK)
return FIO_ERROR_FRAME_DECODING;
- strm.next_out = (Bytef*)ress->dstBuffer;
- strm.avail_out = (uInt)ress->dstBufferSize;
+ writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = (uInt)writeJob->bufferSize;
strm.avail_in = (uInt)ress->srcBufferLoaded;
strm.next_in = (z_const unsigned char*)ress->srcBuffer;
DISPLAYLEVEL(1, "zstd: %s: inflate error %d \n", srcFileName, ret);
decodingError = 1; break;
}
- { size_t const decompBytes = ress->dstBufferSize - strm.avail_out;
+ { size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
if (decompBytes) {
- storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips);
+ writeJob->usedBufferSize = decompBytes;
+ WritePool_queueAndReacquireWriteJob(&writeJob);
outFileSize += decompBytes;
- strm.next_out = (Bytef*)ress->dstBuffer;
- strm.avail_out = (uInt)ress->dstBufferSize;
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = (uInt)writeJob->bufferSize;
}
}
if (ret == Z_STREAM_END) break;
}
- if (strm.avail_in > 0)
- memmove(ress->srcBuffer, strm.next_in, strm.avail_in);
- ress->srcBufferLoaded = strm.avail_in;
+ FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
+
if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */
&& (decodingError==0) ) {
DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName);
decodingError = 1;
}
- FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+ WritePool_releaseWriteJob(writeJob);
+ WritePool_sparseWriteEnd(ress->writePoolCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
}
#endif
-
#ifdef ZSTD_LZMADECOMPRESS
static unsigned long long
FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
- const FIO_prefs_t* const prefs,
const char* srcFileName, int plain_lzma)
{
unsigned long long outFileSize = 0;
lzma_action action = LZMA_RUN;
lzma_ret initRet;
int decodingError = 0;
- unsigned storedSkips = 0;
+ write_job_t *writeJob = NULL;
strm.next_in = 0;
strm.avail_in = 0;
return FIO_ERROR_FRAME_DECODING;
}
- strm.next_out = (BYTE*)ress->dstBuffer;
- strm.avail_out = ress->dstBufferSize;
+ writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = (uInt)writeJob->bufferSize;
strm.next_in = (BYTE const*)ress->srcBuffer;
strm.avail_in = ress->srcBufferLoaded;
srcFileName, ret);
decodingError = 1; break;
}
- { size_t const decompBytes = ress->dstBufferSize - strm.avail_out;
+ { size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
if (decompBytes) {
- storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips);
+ writeJob->usedBufferSize = decompBytes;
+ WritePool_queueAndReacquireWriteJob(&writeJob);
outFileSize += decompBytes;
- strm.next_out = (BYTE*)ress->dstBuffer;
- strm.avail_out = ress->dstBufferSize;
+ strm.next_out = (Bytef*)writeJob->buffer;
+ strm.avail_out = writeJob->bufferSize;
} }
if (ret == LZMA_STREAM_END) break;
}
- if (strm.avail_in > 0)
- memmove(ress->srcBuffer, strm.next_in, strm.avail_in);
- ress->srcBufferLoaded = strm.avail_in;
+ FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
lzma_end(&strm);
- FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+ WritePool_releaseWriteJob(writeJob);
+ WritePool_sparseWriteEnd(ress->writePoolCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
}
#endif
#ifdef ZSTD_LZ4DECOMPRESS
static unsigned long long
FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
- const FIO_prefs_t* const prefs,
const char* srcFileName)
{
unsigned long long filesize = 0;
- LZ4F_errorCode_t nextToLoad;
+ LZ4F_errorCode_t nextToLoad = 4;
LZ4F_decompressionContext_t dCtx;
LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION);
int decodingError = 0;
- unsigned storedSkips = 0;
+ write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
if (LZ4F_isError(errorCode)) {
DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n");
return FIO_ERROR_FRAME_DECODING;
}
- /* Init feed with magic number (already consumed from FILE* sFile) */
- { size_t inSize = 4;
- size_t outSize= 0;
- MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER);
- nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &outSize, ress->srcBuffer, &inSize, NULL);
- if (LZ4F_isError(nextToLoad)) {
- DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n",
- srcFileName, LZ4F_getErrorName(nextToLoad));
- LZ4F_freeDecompressionContext(dCtx);
- return FIO_ERROR_FRAME_DECODING;
- } }
-
/* Main Loop */
for (;nextToLoad;) {
size_t readSize;
size_t pos = 0;
- size_t decodedBytes = ress->dstBufferSize;
+ size_t decodedBytes = writeJob->bufferSize;
+ int fullBufferDecoded = 0;
/* Read input */
- if (nextToLoad > ress->srcBufferSize) nextToLoad = ress->srcBufferSize;
- readSize = fread(ress->srcBuffer, 1, nextToLoad, srcFile);
- if (!readSize) break; /* reached end of file or stream */
+ nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded);
+ readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile);
+ if(!readSize && ferror(srcFile)) {
+ DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
+ decodingError=1;
+ break;
+ }
+ if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
+ ress->srcBufferLoaded += readSize;
- while ((pos < readSize) || (decodedBytes == ress->dstBufferSize)) { /* still to read, or still to flush */
+ while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */
/* Decode Input (at least partially) */
- size_t remaining = readSize - pos;
- decodedBytes = ress->dstBufferSize;
- nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
+ size_t remaining = ress->srcBufferLoaded - pos;
+ decodedBytes = writeJob->bufferSize;
+ nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
if (LZ4F_isError(nextToLoad)) {
DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
srcFileName, LZ4F_getErrorName(nextToLoad));
decodingError = 1; nextToLoad = 0; break;
}
pos += remaining;
+ assert(pos <= ress->srcBufferLoaded);
+ fullBufferDecoded = decodedBytes == writeJob->bufferSize;
/* Write Block */
if (decodedBytes) {
UTIL_HumanReadableSize_t hrs;
- storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decodedBytes, prefs, storedSkips);
+ writeJob->usedBufferSize = decodedBytes;
+ WritePool_queueAndReacquireWriteJob(&writeJob);
filesize += decodedBytes;
hrs = UTIL_makeHumanReadableSize(filesize);
DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix);
if (!nextToLoad) break;
}
+ FIO_consumeDSrcBuffer(ress, pos);
}
- /* can be out because readSize == 0, which could be an fread() error */
- if (ferror(srcFile)) {
- DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
- decodingError=1;
- }
-
if (nextToLoad!=0) {
DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
decodingError=1;
}
LZ4F_freeDecompressionContext(dCtx);
- ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */
- FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+ WritePool_releaseWriteJob(writeJob);
+ WritePool_sparseWriteEnd(ress->writePoolCtx);
return decodingError ? FIO_ERROR_FRAME_DECODING : filesize;
}
filesize += frameSize;
} else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
#ifdef ZSTD_GZDECOMPRESS
- unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, prefs, srcFileName);
+ unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
} else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */
|| (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
#ifdef ZSTD_LZMADECOMPRESS
- unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, prefs, srcFileName, buf[0] != 0xFD);
+ unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
#endif
} else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
#ifdef ZSTD_LZ4DECOMPRESS
- unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, prefs, srcFileName);
+ unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName);
if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
filesize += frameSize;
#else
#endif
} else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */
return FIO_passThrough(prefs,
- ress.dstFile, srcFile,
+ ress.writePoolCtx->dstFile, srcFile,
ress.srcBuffer, ress.srcBufferSize,
ress.srcBufferLoaded);
} else {
int releaseDstFile = 0;
int transferMTime = 0;
- if ((ress.dstFile == NULL) && (prefs->testMode==0)) {
+ if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) {
+ FILE *dstFile;
int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
if ( strcmp(srcFileName, stdinmark) /* special case : don't transfer permissions from stdin */
&& strcmp(dstFileName, stdoutmark)
releaseDstFile = 1;
- ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
- if (ress.dstFile==NULL) return 1;
+ dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
+ if (dstFile==NULL) return 1;
+ WritePool_setDstFile(ress.writePoolCtx, dstFile);
/* Must only be added after FIO_openDstFile() succeeds.
* Otherwise we may delete the destination file if it already exists,
result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName);
if (releaseDstFile) {
- FILE* const dstFile = ress.dstFile;
clearHandler();
- ress.dstFile = NULL;
- if (fclose(dstFile)) {
+ if (WritePool_closeDstFile(ress.writePoolCtx)) {
DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
result = 1;
}
return 1;
}
if (!prefs->testMode) {
- ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
- if (ress.dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
+ FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
+ if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
+ WritePool_setDstFile(ress.writePoolCtx, dstFile);
}
for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) {
status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]);
if (!status) fCtx->nbFilesProcessed++;
error |= status;
}
- if ((!prefs->testMode) && (fclose(ress.dstFile)))
+ if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx)))
EXM_THROW(72, "Write error : %s : cannot properly close output file",
strerror(errno));
} else {