]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Async write for decompression (#2975)
authorYonatan Komornik <11005061+yoniko@users.noreply.github.com>
Fri, 21 Jan 2022 21:55:41 +0000 (13:55 -0800)
committerGitHub <noreply@github.com>
Fri, 21 Jan 2022 21:55:41 +0000 (13:55 -0800)
* Async IO decompression:
- Added --[no-]asyncio flag for CLI decompression.
- Replaced dstBuffer in decompression with a pool of write jobs.
- Added an ability to execute write jobs in a separate thread.
- Added an ability to wait (join) on all jobs in a thread pool (queued and running).

build/meson/programs/meson.build
lib/common/pool.c
lib/common/pool.h
programs/fileio.c
programs/fileio.h
programs/zstdcli.c
tests/playTests.sh

index 4181030c2ee339157dde4cedb61a28965a6ff5b2..0ae93fc107ca48ff77811125b8cd650ae87713d6 100644 (file)
@@ -20,14 +20,24 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
   join_paths(zstd_rootdir, 'programs/dibio.c'),
   join_paths(zstd_rootdir, 'programs/zstdcli_trace.c'),
   # needed due to use of private symbol + -fvisibility=hidden
-  join_paths(zstd_rootdir, 'lib/common/xxhash.c')]
+  join_paths(zstd_rootdir, 'lib/common/xxhash.c'),
+  join_paths(zstd_rootdir, 'lib/common/pool.c'),
+  join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
+  join_paths(zstd_rootdir, 'lib/common/error_private.c')]
 
+zstd_deps = [ libzstd_dep ]
 zstd_c_args = libzstd_debug_cflags
+
+zstd_frugal_deps = [ libzstd_dep ]
+zstd_frugal_c_args = [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ]
+
 if use_multi_thread
+  zstd_deps += [ thread_dep ]
   zstd_c_args += [ '-DZSTD_MULTITHREAD' ]
+  zstd_frugal_deps += [ thread_dep ]
+  zstd_frugal_c_args += [ '-DZSTD_MULTITHREAD' ]
 endif
 
-zstd_deps = [ libzstd_dep ]
 if use_zlib
   zstd_deps += [ zlib_dep ]
   zstd_c_args += [ '-DZSTD_GZCOMPRESS', '-DZSTD_GZDECOMPRESS' ]
@@ -69,14 +79,17 @@ zstd = executable('zstd',
 zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'),
   join_paths(zstd_rootdir, 'programs/timefn.c'),
   join_paths(zstd_rootdir, 'programs/util.c'),
-  join_paths(zstd_rootdir, 'programs/fileio.c')]
+  join_paths(zstd_rootdir, 'programs/fileio.c'),
+  join_paths(zstd_rootdir, 'lib/common/pool.c'),
+  join_paths(zstd_rootdir, 'lib/common/zstd_common.c'),
+  join_paths(zstd_rootdir, 'lib/common/error_private.c')]
 
 # Minimal target, with only zstd compression and decompression.
 # No bench. No legacy.
 executable('zstd-frugal',
   zstd_frugal_sources,
-  dependencies: libzstd_dep,
-  c_args: [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ],
+  dependencies: zstd_frugal_deps,
+  c_args: zstd_frugal_c_args,
   install: true)
 
 install_data(join_paths(zstd_rootdir, 'programs/zstdgrep'),
index 2e37cdd73c81912b103ccf0a0dff8c89b4c3369a..5c1d07d356ea6ba4fb18b1ed4ae03f5d3714c9e8 100644 (file)
@@ -96,9 +96,7 @@ static void* POOL_thread(void* opaque) {
             /* If the intended queue size was 0, signal after finishing job */
             ZSTD_pthread_mutex_lock(&ctx->queueMutex);
             ctx->numThreadsBusy--;
-            if (ctx->queueSize == 1) {
-                ZSTD_pthread_cond_signal(&ctx->queuePushCond);
-            }
+            ZSTD_pthread_cond_signal(&ctx->queuePushCond);
             ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
         }
     }  /* for (;;) */
@@ -190,6 +188,17 @@ void POOL_free(POOL_ctx *ctx) {
     ZSTD_customFree(ctx, ctx->customMem);
 }
 
+/*! POOL_joinJobs() :
+ *  Waits for all queued jobs to finish executing.
+ */
+void POOL_joinJobs(POOL_ctx* ctx) {
+    ZSTD_pthread_mutex_lock(&ctx->queueMutex);
+    while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) {
+        ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
+    }
+    ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
+}
+
 void ZSTD_freeThreadPool (ZSTD_threadPool* pool) {
   POOL_free (pool);
 }
@@ -330,6 +339,11 @@ void POOL_free(POOL_ctx* ctx) {
     (void)ctx;
 }
 
+void POOL_joinJobs(POOL_ctx* ctx){
+    assert(!ctx || ctx == &g_poolCtx);
+    (void)ctx;
+}
+
 int POOL_resize(POOL_ctx* ctx, size_t numThreads) {
     (void)ctx; (void)numThreads;
     return 0;
index 0ebde1805db5f76ffb4e0cd761f41a7da099bb4a..b86a3452e5c80a6b133aef39f81bf75f944fab5e 100644 (file)
@@ -38,6 +38,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
  */
 void POOL_free(POOL_ctx* ctx);
 
+
+/*! POOL_joinJobs() :
+ *  Waits for all queued jobs to finish executing.
+ */
+void POOL_joinJobs(POOL_ctx* ctx);
+
 /*! POOL_resize() :
  *  Expands or shrinks pool's number of threads.
  *  This is more efficient than releasing + creating a new context,
index 5338fa62955badc119cbe7f97f2855bb9c3c3a99..b85e0806bcf7e526c1011169262b28728cd965da 100644 (file)
@@ -34,6 +34,8 @@
 #include <limits.h>     /* INT_MAX */
 #include <signal.h>
 #include "timefn.h"     /* UTIL_getTime, UTIL_clockSpanMicro */
+#include "../lib/common/pool.h"
+#include "../lib/common/threading.h"
 
 #if defined (_MSC_VER)
 #  include <sys/stat.h>
@@ -325,6 +327,7 @@ struct FIO_prefs_s {
     /* IO preferences */
     U32 removeSrcFile;
     U32 overwrite;
+    U32 asyncIO;
 
     /* Computation resources preferences */
     unsigned memLimit;
@@ -395,6 +398,7 @@ FIO_prefs_t* FIO_createPreferences(void)
     ret->literalCompressionMode = ZSTD_ps_auto;
     ret->excludeCompressedFiles = 0;
     ret->allowBlockDevices = 0;
+    ret->asyncIO = 0;
     return ret;
 }
 
@@ -558,6 +562,10 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value)
     prefs->contentSize = value != 0;
 }
 
+void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) {
+    prefs->asyncIO = value;
+}
+
 /* FIO_ctx_t functions */
 
 void FIO_setHasStdoutOutput(FIO_ctx_t* const fCtx, int value) {
@@ -1798,7 +1806,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
 
 static const char* checked_index(const char* options[], size_t length, size_t index) {
     assert(index < length);
-    // Necessary to avoid warnings since -O3 will omit the above `assert`
+    /* Necessary to avoid warnings since -O3 will omit the above `assert` */
     (void) length;
     return options[index];
 }
@@ -2000,16 +2008,124 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
 /* **************************************************************************
  *  Decompression
  ***************************************************************************/
+#define DECOMPRESSION_MAX_WRITE_JOBS    (10)
+
+typedef struct {
+    /* These struct fields should be set only on creation and not changed afterwards */
+    POOL_ctx* writerPool;
+    int totalWriteJobs;
+    FIO_prefs_t* prefs;
+
+    /* Controls the file we currently write to, make changes only by using provided utility functions */
+    FILE* dstFile;
+    unsigned storedSkips;
+
+    /* The jobs and availableWriteJobs fields are access by both the main and writer threads and should
+     * only be mutated after locking the mutex */
+    ZSTD_pthread_mutex_t writeJobsMutex;
+    void* jobs[DECOMPRESSION_MAX_WRITE_JOBS];
+    int availableWriteJobs;
+} write_pool_ctx_t;
+
+typedef struct {
+    /* These fields are automaically set and shouldn't be changed by non WritePool code. */
+    write_pool_ctx_t *ctx;
+    FILE* dstFile;
+    void *buffer;
+    size_t bufferSize;
+
+    /* This field should be changed before a job is queued for execution and should contain the number
+     * of bytes to write from the buffer. */
+    size_t usedBufferSize;
+} write_job_t;
+
 typedef struct {
     void*  srcBuffer;
     size_t srcBufferSize;
     size_t srcBufferLoaded;
-    void*  dstBuffer;
-    size_t dstBufferSize;
     ZSTD_DStream* dctx;
-    FILE*  dstFile;
+    write_pool_ctx_t *writePoolCtx;
 } dRess_t;
 
+static write_job_t *FIO_createWriteJob(write_pool_ctx_t *ctx) {
+    void *buffer;
+    write_job_t *job;
+    job = (write_job_t*) malloc(sizeof(write_job_t));
+    buffer = malloc(ZSTD_DStreamOutSize());
+    if(!job || !buffer)
+        EXM_THROW(101, "Allocation error : not enough memory");
+    job->buffer = buffer;
+    job->bufferSize = ZSTD_DStreamOutSize();
+    job->usedBufferSize = 0;
+    job->dstFile = NULL;
+    job->ctx = ctx;
+    return job;
+}
+
+/* WritePool_createThreadPool:
+ * Creates a thread pool and a mutex for threaded write pool.
+ * Displays warning if asyncio is requested but MT isn't available. */
+static void WritePool_createThreadPool(write_pool_ctx_t *ctx, const FIO_prefs_t *prefs) {
+    ctx->writerPool = NULL;
+    if(prefs->asyncIO) {
+#ifdef ZSTD_MULTITHREAD
+        if (ZSTD_pthread_mutex_init(&ctx->writeJobsMutex, NULL))
+            EXM_THROW(102, "Failed creating write jobs mutex");
+        /* We want DECOMPRESSION_MAX_WRITE_JOBS-2 queue items because we need to always have 1 free buffer to
+         * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
+        assert(DECOMPRESSION_MAX_WRITE_JOBS >= 2);
+        ctx->writerPool = POOL_create(1, DECOMPRESSION_MAX_WRITE_JOBS - 2);
+        if (!ctx->writerPool)
+            EXM_THROW(103, "Failed creating writer thread pool");
+#else
+        DISPLAYLEVEL(2, "Note : asyncio decompression is disabled (lack of multithreading support) \n");
+#endif
+    }
+}
+
+/* WritePool_create:
+ * Allocates and sets and a new write pool including its included jobs. */
+static write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs) {
+    write_pool_ctx_t *ctx;
+    int i;
+    ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t));
+    if(!ctx)
+        EXM_THROW(100, "Allocation error : not enough memory");
+    WritePool_createThreadPool(ctx, prefs);
+    ctx->prefs = prefs;
+    ctx->totalWriteJobs = ctx->writerPool ? DECOMPRESSION_MAX_WRITE_JOBS : 1;
+    ctx->availableWriteJobs = ctx->totalWriteJobs;
+    for(i=0; i < ctx->availableWriteJobs; i++) {
+        ctx->jobs[i] = FIO_createWriteJob(ctx);
+    }
+    ctx->storedSkips = 0;
+    ctx->dstFile = NULL;
+    return ctx;
+}
+
+/* WritePool_free:
+ * Release a previously allocated write thread pool. Makes sure all takss are done and released. */
+static void WritePool_free(write_pool_ctx_t* ctx) {
+    int i=0;
+    if(ctx->writerPool) {
+        /* Make sure we finish all tasks and then free the resources */
+        POOL_joinJobs(ctx->writerPool);
+        /* Make sure we are not leaking jobs */
+        assert(ctx->availableWriteJobs==ctx->totalWriteJobs);
+        POOL_free(ctx->writerPool);
+        ZSTD_pthread_mutex_destroy(&ctx->writeJobsMutex);
+    }
+    assert(ctx->dstFile==NULL);
+    assert(ctx->storedSkips==0);
+    for(i=0; i<ctx->availableWriteJobs; i++) {
+        write_job_t* job = (write_job_t*) ctx->jobs[i];
+        free(job->buffer);
+        free(job);
+    }
+    free(ctx);
+}
+
+
 static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
 {
     dRess_t ress;
@@ -2027,9 +2143,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
 
     ress.srcBufferSize = ZSTD_DStreamInSize();
     ress.srcBuffer = malloc(ress.srcBufferSize);
-    ress.dstBufferSize = ZSTD_DStreamOutSize();
-    ress.dstBuffer = malloc(ress.dstBufferSize);
-    if (!ress.srcBuffer || !ress.dstBuffer)
+    if (!ress.srcBuffer)
         EXM_THROW(61, "Allocation error : not enough memory");
 
     /* dictionary */
@@ -2039,6 +2153,8 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
         free(dictBuffer);
     }
 
+    ress.writePoolCtx = WritePool_create(prefs);
+
     return ress;
 }
 
@@ -2046,9 +2162,16 @@ static void FIO_freeDResources(dRess_t ress)
 {
     CHECK( ZSTD_freeDStream(ress.dctx) );
     free(ress.srcBuffer);
-    free(ress.dstBuffer);
+    WritePool_free(ress.writePoolCtx);
 }
 
+/* FIO_consumeDSrcBuffer:
+ * Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
+static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
+    assert(ress->srcBufferLoaded >= len);
+    ress->srcBufferLoaded -= len;
+    memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
+}
 
 /** FIO_fwriteSparse() :
 *  @return : storedSkips,
@@ -2148,6 +2271,106 @@ FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedS
     }   }
 }
 
+/* WritePool_releaseWriteJob:
+ * Releases an acquired job back to the pool. Doesn't execute the job. */
+static void WritePool_releaseWriteJob(write_job_t *job) {
+    write_pool_ctx_t *ctx = job->ctx;
+    if(ctx->writerPool) {
+        ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
+        assert(ctx->availableWriteJobs < DECOMPRESSION_MAX_WRITE_JOBS);
+        ctx->jobs[ctx->availableWriteJobs++] = job;
+        ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
+    } else {
+        ctx->availableWriteJobs++;
+    }
+}
+
+/* WritePool_acquireWriteJob:
+ * Returns an available write job to be used for a future write. */
+static write_job_t* WritePool_acquireWriteJob(write_pool_ctx_t *ctx) {
+    write_job_t *job;
+    assert(ctx->dstFile!=NULL || ctx->prefs->testMode);
+    if(ctx->writerPool) {
+        ZSTD_pthread_mutex_lock(&ctx->writeJobsMutex);
+        assert(ctx->availableWriteJobs > 0);
+        job = (write_job_t*) ctx->jobs[--ctx->availableWriteJobs];
+        ZSTD_pthread_mutex_unlock(&ctx->writeJobsMutex);
+    } else {
+        assert(ctx->availableWriteJobs==1);
+        ctx->availableWriteJobs--;
+        job = (write_job_t*)ctx->jobs[0];
+    }
+    job->usedBufferSize = 0;
+    job->dstFile = ctx->dstFile;
+    return job;
+}
+
+/* WritePool_executeWriteJob:
+ * Executes a write job synchronously. Can be used as a function for a thread pool. */
+static void WritePool_executeWriteJob(void* opaque){
+    write_job_t* job = (write_job_t*) opaque;
+    write_pool_ctx_t* ctx = job->ctx;
+    ctx->storedSkips = FIO_fwriteSparse(job->dstFile, job->buffer, job->usedBufferSize, ctx->prefs, ctx->storedSkips);
+    WritePool_releaseWriteJob(job);
+}
+
+/* WritePool_queueWriteJob:
+ * Queues a write job for execution.
+ * Make sure to set `usedBufferSize` to the wanted length before call.
+ * The queued job shouldn't be used directly after queueing it. */
+static void WritePool_queueWriteJob(write_job_t *job) {
+    write_pool_ctx_t* ctx = job->ctx;
+    if(ctx->writerPool)
+        POOL_add(ctx->writerPool, WritePool_executeWriteJob, job);
+    else
+        WritePool_executeWriteJob(job);
+}
+
+/* WritePool_queueAndReacquireWriteJob:
+ * Queues a write job for execution and acquires a new one.
+ * After execution `job`'s pointed value would change to the newly acquired job.
+ * Make sure to set `usedBufferSize` to the wanted length before call.
+ * The queued job shouldn't be used directly after queueing it. */
+static void WritePool_queueAndReacquireWriteJob(write_job_t **job) {
+    WritePool_queueWriteJob(*job);
+    *job = WritePool_acquireWriteJob((*job)->ctx);
+}
+
+/* WritePool_sparseWriteEnd:
+ * Ends sparse writes to the current dstFile.
+ * Blocks on completion of all current write jobs before executing. */
+static void WritePool_sparseWriteEnd(write_pool_ctx_t* ctx) {
+    assert(ctx != NULL);
+    if(ctx->writerPool)
+        POOL_joinJobs(ctx->writerPool);
+    FIO_fwriteSparseEnd(ctx->prefs, ctx->dstFile, ctx->storedSkips);
+    ctx->storedSkips = 0;
+}
+
+/* WritePool_setDstFile:
+ * Sets the destination file for future files in the pool.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
+ * Also requires ending of sparse write if a previous file was used in sparse mode. */
+static void WritePool_setDstFile(write_pool_ctx_t *ctx, FILE* dstFile) {
+    assert(ctx!=NULL);
+    /* We can change the dst file only if we have finished writing */
+    if(ctx->writerPool)
+        POOL_joinJobs(ctx->writerPool);
+    assert(ctx->storedSkips == 0);
+    assert(ctx->availableWriteJobs == ctx->totalWriteJobs);
+    ctx->dstFile = dstFile;
+}
+
+/* WritePool_closeDstFile:
+ * Ends sparse write and closes the writePool's current dstFile and sets the dstFile to NULL.
+ * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
+static int WritePool_closeDstFile(write_pool_ctx_t *ctx) {
+    FILE *dstFile = ctx->dstFile;
+    assert(dstFile!=NULL || ctx->prefs->testMode!=0);
+    WritePool_sparseWriteEnd(ctx);
+    WritePool_setDstFile(ctx, NULL);
+    return fclose(dstFile);
+}
 
 /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
     @return : 0 (no error) */
@@ -2224,7 +2447,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
                         U64 alreadyDecoded)  /* for multi-frames streams */
 {
     U64 frameSize = 0;
-    U32 storedSkips = 0;
+    write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
 
     /* display last 20 characters only */
     {   size_t const srcFileLength = strlen(srcFileName);
@@ -2244,7 +2467,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
     /* Main decompression Loop */
     while (1) {
         ZSTD_inBuffer  inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 };
-        ZSTD_outBuffer outBuff= { ress->dstBuffer, ress->dstBufferSize, 0 };
+        ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
         size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
         const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
         UTIL_HumanReadableSize_t const hrs = UTIL_makeHumanReadableSize(alreadyDecoded+frameSize);
@@ -2256,7 +2479,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
         }
 
         /* Write block */
-        storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, prefs, storedSkips);
+        writeJob->usedBufferSize = outBuff.pos;
+        WritePool_queueAndReacquireWriteJob(&writeJob);
         frameSize += outBuff.pos;
         if (fCtx->nbFilesTotal > 1) {
             size_t srcFileNameSize = strlen(srcFileName);
@@ -2273,10 +2497,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
                             srcFileName, hrs.precision, hrs.value, hrs.suffix);
         }
 
-        if (inBuff.pos > 0) {
-            memmove(ress->srcBuffer, (char*)ress->srcBuffer + inBuff.pos, inBuff.size - inBuff.pos);
-            ress->srcBufferLoaded -= inBuff.pos;
-        }
+        FIO_consumeDSrcBuffer(ress, inBuff.pos);
 
         if (readSizeHint == 0) break;   /* end of frame */
 
@@ -2294,7 +2515,8 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
                 ress->srcBufferLoaded += readSize;
     }   }   }
 
-    FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+    WritePool_releaseWriteJob(writeJob);
+    WritePool_sparseWriteEnd(ress->writePoolCtx);
 
     return frameSize;
 }
@@ -2302,15 +2524,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
 
 #ifdef ZSTD_GZDECOMPRESS
 static unsigned long long
-FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile,
-                      const FIO_prefs_t* const prefs,
-                      const char* srcFileName)
+FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
 {
     unsigned long long outFileSize = 0;
     z_stream strm;
     int flush = Z_NO_FLUSH;
     int decodingError = 0;
-    unsigned storedSkips = 0;
+    write_job_t *writeJob = NULL;
 
     strm.zalloc = Z_NULL;
     strm.zfree = Z_NULL;
@@ -2321,8 +2541,9 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile,
     if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK)
         return FIO_ERROR_FRAME_DECODING;
 
-    strm.next_out = (Bytef*)ress->dstBuffer;
-    strm.avail_out = (uInt)ress->dstBufferSize;
+    writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+    strm.next_out = (Bytef*)writeJob->buffer;
+    strm.avail_out = (uInt)writeJob->bufferSize;
     strm.avail_in = (uInt)ress->srcBufferLoaded;
     strm.next_in = (z_const unsigned char*)ress->srcBuffer;
 
@@ -2343,35 +2564,34 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile,
             DISPLAYLEVEL(1, "zstd: %s: inflate error %d \n", srcFileName, ret);
             decodingError = 1; break;
         }
-        {   size_t const decompBytes = ress->dstBufferSize - strm.avail_out;
+        {   size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
             if (decompBytes) {
-                storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips);
+                writeJob->usedBufferSize = decompBytes;
+                WritePool_queueAndReacquireWriteJob(&writeJob);
                 outFileSize += decompBytes;
-                strm.next_out = (Bytef*)ress->dstBuffer;
-                strm.avail_out = (uInt)ress->dstBufferSize;
+                strm.next_out = (Bytef*)writeJob->buffer;
+                strm.avail_out = (uInt)writeJob->bufferSize;
             }
         }
         if (ret == Z_STREAM_END) break;
     }
 
-    if (strm.avail_in > 0)
-        memmove(ress->srcBuffer, strm.next_in, strm.avail_in);
-    ress->srcBufferLoaded = strm.avail_in;
+    FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
+
     if ( (inflateEnd(&strm) != Z_OK)  /* release resources ; error detected */
       && (decodingError==0) ) {
         DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName);
         decodingError = 1;
     }
-    FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+    WritePool_releaseWriteJob(writeJob);
+    WritePool_sparseWriteEnd(ress->writePoolCtx);
     return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
 }
 #endif
 
-
 #ifdef ZSTD_LZMADECOMPRESS
 static unsigned long long
 FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
-                        const FIO_prefs_t* const prefs,
                         const char* srcFileName, int plain_lzma)
 {
     unsigned long long outFileSize = 0;
@@ -2379,7 +2599,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
     lzma_action action = LZMA_RUN;
     lzma_ret initRet;
     int decodingError = 0;
-    unsigned storedSkips = 0;
+    write_job_t *writeJob = NULL;
 
     strm.next_in = 0;
     strm.avail_in = 0;
@@ -2396,8 +2616,9 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
         return FIO_ERROR_FRAME_DECODING;
     }
 
-    strm.next_out = (BYTE*)ress->dstBuffer;
-    strm.avail_out = ress->dstBufferSize;
+    writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
+    strm.next_out = (Bytef*)writeJob->buffer;
+    strm.avail_out = (uInt)writeJob->bufferSize;
     strm.next_in = (BYTE const*)ress->srcBuffer;
     strm.avail_in = ress->srcBufferLoaded;
 
@@ -2420,21 +2641,21 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
                             srcFileName, ret);
             decodingError = 1; break;
         }
-        {   size_t const decompBytes = ress->dstBufferSize - strm.avail_out;
+        {   size_t const decompBytes = writeJob->bufferSize - strm.avail_out;
             if (decompBytes) {
-                storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips);
+                writeJob->usedBufferSize = decompBytes;
+                WritePool_queueAndReacquireWriteJob(&writeJob);
                 outFileSize += decompBytes;
-                strm.next_out = (BYTE*)ress->dstBuffer;
-                strm.avail_out = ress->dstBufferSize;
+                strm.next_out = (Bytef*)writeJob->buffer;
+                strm.avail_out = writeJob->bufferSize;
         }   }
         if (ret == LZMA_STREAM_END) break;
     }
 
-    if (strm.avail_in > 0)
-        memmove(ress->srcBuffer, strm.next_in, strm.avail_in);
-    ress->srcBufferLoaded = strm.avail_in;
+    FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
     lzma_end(&strm);
-    FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+    WritePool_releaseWriteJob(writeJob);
+    WritePool_sparseWriteEnd(ress->writePoolCtx);
     return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize;
 }
 #endif
@@ -2442,60 +2663,57 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
 #ifdef ZSTD_LZ4DECOMPRESS
 static unsigned long long
 FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
-                       const FIO_prefs_t* const prefs,
                        const char* srcFileName)
 {
     unsigned long long filesize = 0;
-    LZ4F_errorCode_t nextToLoad;
+    LZ4F_errorCode_t nextToLoad = 4;
     LZ4F_decompressionContext_t dCtx;
     LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION);
     int decodingError = 0;
-    unsigned storedSkips = 0;
+    write_job_t *writeJob = WritePool_acquireWriteJob(ress->writePoolCtx);
 
     if (LZ4F_isError(errorCode)) {
         DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n");
         return FIO_ERROR_FRAME_DECODING;
     }
 
-    /* Init feed with magic number (already consumed from FILE* sFile) */
-    {   size_t inSize = 4;
-        size_t outSize= 0;
-        MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER);
-        nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &outSize, ress->srcBuffer, &inSize, NULL);
-        if (LZ4F_isError(nextToLoad)) {
-            DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n",
-                            srcFileName, LZ4F_getErrorName(nextToLoad));
-            LZ4F_freeDecompressionContext(dCtx);
-            return FIO_ERROR_FRAME_DECODING;
-    }   }
-
     /* Main Loop */
     for (;nextToLoad;) {
         size_t readSize;
         size_t pos = 0;
-        size_t decodedBytes = ress->dstBufferSize;
+        size_t decodedBytes = writeJob->bufferSize;
+        int fullBufferDecoded = 0;
 
         /* Read input */
-        if (nextToLoad > ress->srcBufferSize) nextToLoad = ress->srcBufferSize;
-        readSize = fread(ress->srcBuffer, 1, nextToLoad, srcFile);
-        if (!readSize) break;   /* reached end of file or stream */
+        nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded);
+        readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile);
+        if(!readSize && ferror(srcFile)) {
+            DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
+            decodingError=1;
+            break;
+        }
+        if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
+        ress->srcBufferLoaded += readSize;
 
-        while ((pos < readSize) || (decodedBytes == ress->dstBufferSize)) {  /* still to read, or still to flush */
+        while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) {  /* still to read, or still to flush */
             /* Decode Input (at least partially) */
-            size_t remaining = readSize - pos;
-            decodedBytes = ress->dstBufferSize;
-            nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
+            size_t remaining = ress->srcBufferLoaded - pos;
+            decodedBytes = writeJob->bufferSize;
+            nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
             if (LZ4F_isError(nextToLoad)) {
                 DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
                                 srcFileName, LZ4F_getErrorName(nextToLoad));
                 decodingError = 1; nextToLoad = 0; break;
             }
             pos += remaining;
+            assert(pos <= ress->srcBufferLoaded);
+            fullBufferDecoded = decodedBytes == writeJob->bufferSize;
 
             /* Write Block */
             if (decodedBytes) {
                 UTIL_HumanReadableSize_t hrs;
-                storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decodedBytes, prefs, storedSkips);
+                writeJob->usedBufferSize = decodedBytes;
+                WritePool_queueAndReacquireWriteJob(&writeJob);
                 filesize += decodedBytes;
                 hrs = UTIL_makeHumanReadableSize(filesize);
                 DISPLAYUPDATE(2, "\rDecompressed : %.*f%s  ", hrs.precision, hrs.value, hrs.suffix);
@@ -2503,21 +2721,16 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
 
             if (!nextToLoad) break;
         }
+        FIO_consumeDSrcBuffer(ress, pos);
     }
-    /* can be out because readSize == 0, which could be an fread() error */
-    if (ferror(srcFile)) {
-        DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
-        decodingError=1;
-    }
-
     if (nextToLoad!=0) {
         DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
         decodingError=1;
     }
 
     LZ4F_freeDecompressionContext(dCtx);
-    ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */
-    FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips);
+    WritePool_releaseWriteJob(writeJob);
+    WritePool_sparseWriteEnd(ress->writePoolCtx);
 
     return decodingError ? FIO_ERROR_FRAME_DECODING : filesize;
 }
@@ -2566,7 +2779,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
             filesize += frameSize;
         } else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
 #ifdef ZSTD_GZDECOMPRESS
-            unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, prefs, srcFileName);
+            unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName);
             if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
             filesize += frameSize;
 #else
@@ -2576,7 +2789,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
         } else if ((buf[0] == 0xFD && buf[1] == 0x37)  /* xz magic number */
                 || (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
 #ifdef ZSTD_LZMADECOMPRESS
-            unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, prefs, srcFileName, buf[0] != 0xFD);
+            unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD);
             if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
             filesize += frameSize;
 #else
@@ -2585,7 +2798,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
 #endif
         } else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
 #ifdef ZSTD_LZ4DECOMPRESS
-            unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, prefs, srcFileName);
+            unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName);
             if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
             filesize += frameSize;
 #else
@@ -2594,7 +2807,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
 #endif
         } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) {  /* pass-through mode */
             return FIO_passThrough(prefs,
-                                   ress.dstFile, srcFile,
+                                   ress.writePoolCtx->dstFile, srcFile,
                                    ress.srcBuffer, ress.srcBufferSize,
                                    ress.srcBufferLoaded);
         } else {
@@ -2632,7 +2845,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
     int releaseDstFile = 0;
     int transferMTime = 0;
 
-    if ((ress.dstFile == NULL) && (prefs->testMode==0)) {
+    if ((ress.writePoolCtx->dstFile == NULL) && (prefs->testMode==0)) {
+        FILE *dstFile;
         int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
         if ( strcmp(srcFileName, stdinmark)   /* special case : don't transfer permissions from stdin */
           && strcmp(dstFileName, stdoutmark)
@@ -2644,8 +2858,9 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
 
         releaseDstFile = 1;
 
-        ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
-        if (ress.dstFile==NULL) return 1;
+        dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
+        if (dstFile==NULL) return 1;
+        WritePool_setDstFile(ress.writePoolCtx, dstFile);
 
         /* Must only be added after FIO_openDstFile() succeeds.
          * Otherwise we may delete the destination file if it already exists,
@@ -2657,10 +2872,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
     result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName);
 
     if (releaseDstFile) {
-        FILE* const dstFile = ress.dstFile;
         clearHandler();
-        ress.dstFile = NULL;
-        if (fclose(dstFile)) {
+        if (WritePool_closeDstFile(ress.writePoolCtx)) {
             DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
             result = 1;
         }
@@ -2874,15 +3087,16 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx,
             return 1;
         }
         if (!prefs->testMode) {
-            ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
-            if (ress.dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
+            FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
+            if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName);
+            WritePool_setDstFile(ress.writePoolCtx, dstFile);
         }
         for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) {
             status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]);
             if (!status) fCtx->nbFilesProcessed++;
             error |= status;
         }
-        if ((!prefs->testMode) && (fclose(ress.dstFile)))
+        if ((!prefs->testMode) && (WritePool_closeDstFile(ress.writePoolCtx)))
             EXM_THROW(72, "Write error : %s : cannot properly close output file",
                         strerror(errno));
     } else {
index 61094db83cba374f6aa1694153cb5510548f64be..398937a64e825a691db0f972927602f859115f1b 100644 (file)
@@ -109,6 +109,7 @@ void FIO_setAllowBlockDevices(FIO_prefs_t* const prefs, int allowBlockDevices);
 void FIO_setPatchFromMode(FIO_prefs_t* const prefs, int value);
 void FIO_setContentSize(FIO_prefs_t* const prefs, int value);
 void FIO_displayCompressionParameters(const FIO_prefs_t* prefs);
+void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value);
 
 /* FIO_ctx_t functions */
 void FIO_setNbFilesTotal(FIO_ctx_t* const fCtx, int value);
index bfe18c0c1ba334cd17125eb91eed827f55fbbc0c..fd563e1c24d0e47e2d930f80fb48bb7fdb4af894 100644 (file)
@@ -239,9 +239,12 @@ static void usage_advanced(const char* programName)
 #ifndef ZSTD_NODECOMPRESS
     DISPLAYOUT( "\n");
     DISPLAYOUT( "Advanced decompression arguments : \n");
-    DISPLAYOUT( " -l     : print information about zstd compressed files \n");
-    DISPLAYOUT( "--test  : test compressed file integrity \n");
-    DISPLAYOUT( " -M#    : Set a memory usage limit for decompression \n");
+    DISPLAYOUT( " -l        : print information about zstd compressed files \n");
+    DISPLAYOUT( "--test     : test compressed file integrity \n");
+    DISPLAYOUT( " -M#       : Set a memory usage limit for decompression \n");
+#ifdef ZSTD_MULTITHREAD
+    DISPLAYOUT( "--[no-]asyncio  : use threaded asynchronous IO for output (default: disabled) \n");
+#endif
 # if ZSTD_SPARSE_DEFAULT
     DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
 # else
@@ -912,6 +915,8 @@ int main(int argCount, const char* argv[])
                 if (!strcmp(argument, "--sparse")) { FIO_setSparseWrite(prefs, 2); continue; }
                 if (!strcmp(argument, "--no-sparse")) { FIO_setSparseWrite(prefs, 0); continue; }
                 if (!strcmp(argument, "--test")) { operation=zom_test; continue; }
+                if (!strcmp(argument, "--asyncio")) { FIO_setAsyncIOFlag(prefs, 1); continue;}
+                if (!strcmp(argument, "--no-asyncio")) { FIO_setAsyncIOFlag(prefs, 0); continue;}
                 if (!strcmp(argument, "--train")) { operation=zom_train; if (outFileName==NULL) outFileName=g_defaultDictName; continue; }
                 if (!strcmp(argument, "--no-dictID")) { FIO_setDictIDFlag(prefs, 0); continue; }
                 if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(prefs, 0); continue; }
index b7a3d88a817e5ada65922653291271b823692868..78d8e742aa31e4d36774c6cd1da100317cd795bd 100755 (executable)
@@ -1575,6 +1575,44 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
     exit 1
 fi
 
+println "\n===>  zstd asyncio decompression tests "
+
+addFrame() {
+    datagen -g2M -s$2 >> tmp_uncompressed
+    datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst
+}
+
+addTwoFrames() {
+  addFrame $1 1
+  addFrame $1 2
+}
+
+testAsyncIO() {
+  roundTripTest -g2M "3 --asyncio --format=$1"
+  roundTripTest -g2M "3 --no-asyncio --format=$1"
+}
+
+rm -f tmp_compressed tmp_uncompressed
+testAsyncIO zstd
+addTwoFrames zstd
+if [ $GZIPMODE -eq 1 ]; then
+  testAsyncIO gzip
+  addTwoFrames gzip
+fi
+if [ $LZMAMODE -eq 1 ]; then
+  testAsyncIO lzma
+  addTwoFrames lzma
+fi
+if [ $LZ4MODE -eq 1 ]; then
+  testAsyncIO lz4
+  addTwoFrames lz4
+fi
+cat tmp_uncompressed | $MD5SUM > tmp2
+zstd -d tmp_compressed.zst --asyncio -c | $MD5SUM > tmp1
+$DIFF -q tmp1 tmp2
+rm tmp1
+zstd -d tmp_compressed.zst --no-asyncio -c | $MD5SUM > tmp1
+$DIFF -q tmp1 tmp2
 
 if [ "$1" != "--test-large-data" ]; then
     println "Skipping large data tests"