]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
AsyncIO compression part 2 - added async read and asyncio to compression code (#3022)
authorYonatan Komornik <11005061+yoniko@users.noreply.github.com>
Mon, 31 Jan 2022 23:43:41 +0000 (15:43 -0800)
committerGitHub <noreply@github.com>
Mon, 31 Jan 2022 23:43:41 +0000 (15:43 -0800)
* Compression asyncio:
- Added asyncio functionality for compression flow
- Added ReadPool for async reads, implemented in both comp and decomp flows

programs/README.md
programs/fileio.c
programs/fileio_asyncio.c
programs/fileio_asyncio.h
programs/fileio_common.h
programs/fileio_types.h
programs/zstdcli.c
tests/playTests.sh

index 5570f90c3b4f84e5c48d8f21232096f79e3bec24..b88cf78d71a46aa6caeed49b823e21b7b53b367b 100644 (file)
@@ -164,6 +164,7 @@ Advanced arguments :
 --filelist FILE : read list of files to operate upon from FILE
 --output-dir-flat DIR : processed files are stored into DIR
 --output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure
+--[no-]asyncio : use asynchronous IO (default: enabled)
 --[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate).
 --      : All arguments after "--" are treated as files
 
index 64909b96401b07781440071cb4724e97fa4245dc..502f69c157683ada4f7901f34516bed77574c22e 100644 (file)
@@ -289,7 +289,7 @@ FIO_prefs_t* FIO_createPreferences(void)
     ret->literalCompressionMode = ZSTD_ps_auto;
     ret->excludeCompressedFiles = 0;
     ret->allowBlockDevices = 0;
-    ret->asyncIO = 0;
+    ret->asyncIO = AIO_supported();
     return ret;
 }
 
@@ -848,16 +848,12 @@ static int FIO_removeMultiFilesWarning(FIO_ctx_t* const fCtx, const FIO_prefs_t*
  *  Compression
  ************************************************************************/
 typedef struct {
-    FILE* srcFile;
-    FILE* dstFile;
-    void*  srcBuffer;
-    size_t srcBufferSize;
-    void*  dstBuffer;
-    size_t dstBufferSize;
     void* dictBuffer;
     size_t dictBufferSize;
     const char* dictFileName;
     ZSTD_CStream* cctx;
+    WritePoolCtx_t *writeCtx;
+    ReadPoolCtx_t *readCtx;
 } cRess_t;
 
 /** ZSTD_cycleLog() :
@@ -906,9 +902,6 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
     if (ress.cctx == NULL)
         EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx",
                     strerror(errno));
-    ress.srcBufferSize = ZSTD_CStreamInSize();
-    ress.srcBuffer = malloc(ress.srcBufferSize);
-    ress.dstBufferSize = ZSTD_CStreamOutSize();
 
     /* need to update memLimit before calling createDictBuffer
      * because of memLimit check inside it */
@@ -916,10 +909,10 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
         unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
         FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
     }
-    ress.dstBuffer = malloc(ress.dstBufferSize);
     ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs);   /* works with dictFileName==NULL */
-    if (!ress.srcBuffer || !ress.dstBuffer)
-        EXM_THROW(31, "allocation error : not enough memory");
+
+    ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
+    ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
 
     /* Advanced parameters, including dictionary */
     if (dictFileName && (ress.dictBuffer==NULL))
@@ -982,9 +975,9 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
 
 static void FIO_freeCResources(const cRess_t* const ress)
 {
-    free(ress->srcBuffer);
-    free(ress->dstBuffer);
     free(ress->dictBuffer);
+    AIO_WritePool_free(ress->writeCtx);
+    AIO_ReadPool_free(ress->readCtx);
     ZSTD_freeCStream(ress->cctx);   /* never fails */
 }
 
@@ -997,6 +990,7 @@ FIO_compressGzFrame(const cRess_t* ress,  /* buffers & handlers are used, but no
 {
     unsigned long long inFileSize = 0, outFileSize = 0;
     z_stream strm;
+    IOJob_t *writeJob = NULL;
 
     if (compressionLevel > Z_BEST_COMPRESSION)
         compressionLevel = Z_BEST_COMPRESSION;
@@ -1012,51 +1006,58 @@ FIO_compressGzFrame(const cRess_t* ress,  /* buffers & handlers are used, but no
             EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret);
     }   }
 
+    writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
     strm.next_in = 0;
     strm.avail_in = 0;
-    strm.next_out = (Bytef*)ress->dstBuffer;
-    strm.avail_out = (uInt)ress->dstBufferSize;
+    strm.next_out = (Bytef*)writeJob->buffer;
+    strm.avail_out = (uInt)writeJob->bufferSize;
 
     while (1) {
         int ret;
         if (strm.avail_in == 0) {
-            size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
-            if (inSize == 0) break;
-            inFileSize += inSize;
-            strm.next_in = (z_const unsigned char*)ress->srcBuffer;
-            strm.avail_in = (uInt)inSize;
+            AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
+            if (ress->readCtx->srcBufferLoaded == 0) break;
+            inFileSize += ress->readCtx->srcBufferLoaded;
+            strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
+            strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
+        }
+
+        {
+            size_t const availBefore = strm.avail_in;
+            ret = deflate(&strm, Z_NO_FLUSH);
+            AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
         }
-        ret = deflate(&strm, Z_NO_FLUSH);
+
         if (ret != Z_OK)
             EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret);
-        {   size_t const cSize = ress->dstBufferSize - strm.avail_out;
+        {   size_t const cSize = writeJob->bufferSize - strm.avail_out;
             if (cSize) {
-                if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
-                    EXM_THROW(73, "Write error : cannot write to output file : %s ", strerror(errno));
+                writeJob->usedBufferSize = cSize;
+                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                 outFileSize += cSize;
-                strm.next_out = (Bytef*)ress->dstBuffer;
-                strm.avail_out = (uInt)ress->dstBufferSize;
-        }   }
+                strm.next_out = (Bytef*)writeJob->buffer;
+                strm.avail_out = (uInt)writeJob->bufferSize;
+            }   }
         if (srcFileSize == UTIL_FILESIZE_UNKNOWN) {
             DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ",
-                            (unsigned)(inFileSize>>20),
-                            (double)outFileSize/inFileSize*100)
+                          (unsigned)(inFileSize>>20),
+                          (double)outFileSize/inFileSize*100)
         } else {
             DISPLAYUPDATE(2, "\rRead : %u / %u MB ==> %.2f%% ",
-                            (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
-                            (double)outFileSize/inFileSize*100);
-    }   }
+                          (unsigned)(inFileSize>>20), (unsigned)(srcFileSize>>20),
+                          (double)outFileSize/inFileSize*100);
+        }   }
 
     while (1) {
         int const ret = deflate(&strm, Z_FINISH);
-        {   size_t const cSize = ress->dstBufferSize - strm.avail_out;
+        {   size_t const cSize = writeJob->bufferSize - strm.avail_out;
             if (cSize) {
-                if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize)
-                    EXM_THROW(75, "Write error : %s ", strerror(errno));
+                writeJob->usedBufferSize = cSize;
+                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                 outFileSize += cSize;
-                strm.next_out = (Bytef*)ress->dstBuffer;
-                strm.avail_out = (uInt)ress->dstBufferSize;
-        }   }
+                strm.next_out = (Bytef*)writeJob->buffer;
+                strm.avail_out = (uInt)writeJob->bufferSize;
+            }   }
         if (ret == Z_STREAM_END) break;
         if (ret != Z_BUF_ERROR)
             EXM_THROW(77, "zstd: %s: deflate error %d \n", srcFileName, ret);
@@ -1067,6 +1068,8 @@ FIO_compressGzFrame(const cRess_t* ress,  /* buffers & handlers are used, but no
             EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret);
     }   }
     *readsize = inFileSize;
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
     return outFileSize;
 }
 #endif
@@ -1082,6 +1085,7 @@ FIO_compressLzmaFrame(cRess_t* ress,
     lzma_stream strm = LZMA_STREAM_INIT;
     lzma_action action = LZMA_RUN;
     lzma_ret ret;
+    IOJob_t *writeJob = NULL;
 
     if (compressionLevel < 0) compressionLevel = 0;
     if (compressionLevel > 9) compressionLevel = 9;
@@ -1099,31 +1103,37 @@ FIO_compressLzmaFrame(cRess_t* ress,
             EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret);
     }
 
+    writeJob =AIO_WritePool_acquireJob(ress->writeCtx);
+    strm.next_out = (Bytef*)writeJob->buffer;
+    strm.avail_out = (uInt)writeJob->bufferSize;
     strm.next_in = 0;
     strm.avail_in = 0;
-    strm.next_out = (BYTE*)ress->dstBuffer;
-    strm.avail_out = ress->dstBufferSize;
 
     while (1) {
         if (strm.avail_in == 0) {
-            size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile);
-            if (inSize == 0) action = LZMA_FINISH;
+            size_t const inSize = AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize());
+            if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
             inFileSize += inSize;
-            strm.next_in = (BYTE const*)ress->srcBuffer;
-            strm.avail_in = inSize;
+            strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
+            strm.avail_in = ress->readCtx->srcBufferLoaded;
+        }
+
+        {
+            size_t const availBefore = strm.avail_in;
+            ret = lzma_code(&strm, action);
+            AIO_ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in);
         }
 
-        ret = lzma_code(&strm, action);
 
         if (ret != LZMA_OK && ret != LZMA_STREAM_END)
             EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret);
-        {   size_t const compBytes = ress->dstBufferSize - strm.avail_out;
+        {   size_t const compBytes = writeJob->bufferSize - strm.avail_out;
             if (compBytes) {
-                if (fwrite(ress->dstBuffer, 1, compBytes, ress->dstFile) != compBytes)
-                    EXM_THROW(85, "Write error : %s", strerror(errno));
+                writeJob->usedBufferSize = compBytes;
+                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                 outFileSize += compBytes;
-                strm.next_out = (BYTE*)ress->dstBuffer;
-                strm.avail_out = ress->dstBufferSize;
+                strm.next_out = (Bytef*)writeJob->buffer;
+                strm.avail_out = writeJob->bufferSize;
         }   }
         if (srcFileSize == UTIL_FILESIZE_UNKNOWN)
             DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%",
@@ -1139,6 +1149,9 @@ FIO_compressLzmaFrame(cRess_t* ress,
     lzma_end(&strm);
     *readsize = inFileSize;
 
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
+
     return outFileSize;
 }
 #endif
@@ -1164,15 +1177,18 @@ FIO_compressLz4Frame(cRess_t* ress,
     LZ4F_preferences_t prefs;
     LZ4F_compressionContext_t ctx;
 
+    IOJob_t* writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
+
     LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
     if (LZ4F_isError(errorCode))
         EXM_THROW(31, "zstd: failed to create lz4 compression context");
 
     memset(&prefs, 0, sizeof(prefs));
 
-    assert(blockSize <= ress->srcBufferSize);
+    assert(blockSize <= ress->readCtx->base.jobBufferSize);
 
-    prefs.autoFlush = 1;
+    /* autoflush off to mitigate a bug in lz4<=1.9.3 for compression level 12 */
+    prefs.autoFlush = 0;
     prefs.compressionLevel = compressionLevel;
     prefs.frameInfo.blockMode = LZ4F_blockLinked;
     prefs.frameInfo.blockSizeID = LZ4F_max64KB;
@@ -1180,27 +1196,25 @@ FIO_compressLz4Frame(cRess_t* ress,
 #if LZ4_VERSION_NUMBER >= 10600
     prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize;
 #endif
-    assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize);
+    assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize);
 
     {
-        size_t readSize;
-        size_t headerSize = LZ4F_compressBegin(ctx, ress->dstBuffer, ress->dstBufferSize, &prefs);
+        size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs);
         if (LZ4F_isError(headerSize))
             EXM_THROW(33, "File header generation failed : %s",
                             LZ4F_getErrorName(headerSize));
-        if (fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile) != headerSize)
-            EXM_THROW(34, "Write error : %s (cannot write header)", strerror(errno));
+        writeJob->usedBufferSize = headerSize;
+        AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
         outFileSize += headerSize;
 
         /* Read first block */
-        readSize  = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
-        inFileSize += readSize;
+        inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
 
         /* Main Loop */
-        while (readSize>0) {
-            size_t const outSize = LZ4F_compressUpdate(ctx,
-                                        ress->dstBuffer, ress->dstBufferSize,
-                                        ress->srcBuffer, readSize, NULL);
+        while (ress->readCtx->srcBufferLoaded) {
+            size_t inSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
+            size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize,
+                                                       ress->readCtx->srcBuffer, inSize, NULL);
             if (LZ4F_isError(outSize))
                 EXM_THROW(35, "zstd: %s: lz4 compression failed : %s",
                             srcFileName, LZ4F_getErrorName(outSize));
@@ -1216,33 +1230,29 @@ FIO_compressLz4Frame(cRess_t* ress,
             }
 
             /* Write Block */
-            {   size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile);
-                if (sizeCheck != outSize)
-                    EXM_THROW(36, "Write error : %s", strerror(errno));
-            }
+            writeJob->usedBufferSize = outSize;
+            AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
 
             /* Read next block */
-            readSize  = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile);
-            inFileSize += readSize;
+            AIO_ReadPool_consumeBytes(ress->readCtx, inSize);
+            inFileSize += AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
         }
-        if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName);
 
         /* End of Stream mark */
-        headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL);
+        headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL);
         if (LZ4F_isError(headerSize))
             EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s",
                         srcFileName, LZ4F_getErrorName(headerSize));
 
-        {   size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile);
-            if (sizeCheck != headerSize)
-                EXM_THROW(39, "Write error : %s (cannot write end of stream)",
-                            strerror(errno));
-        }
+        writeJob->usedBufferSize = headerSize;
+        AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
         outFileSize += headerSize;
     }
 
     *readsize = inFileSize;
     LZ4F_freeCompressionContext(ctx);
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
 
     return outFileSize;
 }
@@ -1257,8 +1267,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
                       int compressionLevel, U64* readsize)
 {
     cRess_t const ress = *ressPtr;
-    FILE* const srcFile = ress.srcFile;
-    FILE* const dstFile = ress.dstFile;
+    IOJob_t *writeJob = AIO_WritePool_acquireJob(ressPtr->writeCtx);
+
     U64 compressedfilesize = 0;
     ZSTD_EndDirective directive = ZSTD_e_continue;
     U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
@@ -1303,12 +1313,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
     do {
         size_t stillToFlush;
         /* Fill input Buffer */
-        size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
-        ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
+        size_t const inSize = AIO_ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize());
+        ZSTD_inBuffer inBuff = { ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 };
         DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize);
         *readsize += inSize;
 
-        if ((inSize == 0) || (*readsize == fileSize))
+        if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize))
             directive = ZSTD_e_end;
 
         stillToFlush = 1;
@@ -1316,9 +1326,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
             || (directive == ZSTD_e_end && stillToFlush != 0) ) {
 
             size_t const oldIPos = inBuff.pos;
-            ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
+            ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
             size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
             CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive));
+            AIO_ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos);
 
             /* count stats */
             inputPresented++;
@@ -1327,12 +1338,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
 
             /* Write compressed stream */
             DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
-                            (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
+                         (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos);
             if (outBuff.pos) {
-                size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
-                if (sizeCheck != outBuff.pos)
-                    EXM_THROW(25, "Write error : %s (cannot write compressed block)",
-                                    strerror(errno));
+                writeJob->usedBufferSize = outBuff.pos;
+                AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
                 compressedfilesize += outBuff.pos;
             }
 
@@ -1464,14 +1473,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
         }  /* while ((inBuff.pos != inBuff.size) */
     } while (directive != ZSTD_e_end);
 
-    if (ferror(srcFile)) {
-        EXM_THROW(26, "Read error : I/O error");
-    }
     if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) {
         EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B",
                 (unsigned long long)*readsize, (unsigned long long)fileSize);
     }
 
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ressPtr->writeCtx);
+
     return compressedfilesize;
 }
 
@@ -1572,7 +1581,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx,
 
 
 /*! FIO_compressFilename_dstFile() :
- *  open dstFileName, or pass-through if ress.dstFile != NULL,
+ *  open dstFileName, or pass-through if ress.file != NULL,
  *  then start compression with FIO_compressFilename_internal().
  *  Manages source removal (--rm) and file permissions transfer.
  *  note : ress.srcFile must be != NULL,
@@ -1591,8 +1600,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
     int result;
     stat_t statbuf;
     int transferMTime = 0;
-    assert(ress.srcFile != NULL);
-    if (ress.dstFile == NULL) {
+    FILE *dstFile;
+    assert(AIO_ReadPool_getFile(ress.readCtx) != NULL);
+    if (AIO_WritePool_getFile(ress.writeCtx) == NULL) {
         int dstFilePermissions = DEFAULT_FILE_PERMISSIONS;
         if ( strcmp (srcFileName, stdinmark)
           && strcmp (dstFileName, stdoutmark)
@@ -1604,8 +1614,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
 
         closeDstFile = 1;
         DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName);
-        ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
-        if (ress.dstFile==NULL) return 1;  /* could not open dstFileName */
+        dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions);
+        if (dstFile==NULL) return 1;  /* could not open dstFileName */
+        AIO_WritePool_setFile(ress.writeCtx, dstFile);
         /* Must only be added after FIO_openDstFile() succeeds.
          * Otherwise we may delete the destination file if it already exists,
          * and the user presses Ctrl-C when asked if they wish to overwrite.
@@ -1616,13 +1627,10 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx,
     result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
 
     if (closeDstFile) {
-        FILE* const dstFile = ress.dstFile;
-        ress.dstFile = NULL;
-
         clearHandler();
 
         DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName);
-        if (fclose(dstFile)) { /* error closing dstFile */
+        if (AIO_WritePool_closeFile(ress.writeCtx)) { /* error closing file */
             DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno));
             result=1;
         }
@@ -1668,6 +1676,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
                              int compressionLevel)
 {
     int result;
+    FILE* srcFile;
     DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);
 
     /* ensure src is not a directory */
@@ -1691,13 +1700,13 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
         return 0;
     }
 
-    ress.srcFile = FIO_openSrcFile(prefs, srcFileName);
-    if (ress.srcFile == NULL) return 1;   /* srcFile could not be opened */
+    srcFile = FIO_openSrcFile(prefs, srcFileName);
+    if (srcFile == NULL) return 1;   /* srcFile could not be opened */
 
+    AIO_ReadPool_setFile(ress.readCtx, srcFile);
     result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel);
+    AIO_ReadPool_closeFile(ress.readCtx);
 
-    fclose(ress.srcFile);
-    ress.srcFile = NULL;
     if ( prefs->removeSrcFile   /* --rm */
       && result == 0       /* success */
       && strcmp(srcFileName, stdinmark)   /* exception : don't erase stdin */
@@ -1844,23 +1853,24 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
     /* init */
     assert(outFileName != NULL || suffix != NULL);
     if (outFileName != NULL) {   /* output into a single destination (stdout typically) */
+        FILE *dstFile;
         if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) {
             FIO_freeCResources(&ress);
             return 1;
         }
-        ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
-        if (ress.dstFile == NULL) {  /* could not open outFileName */
+        dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS);
+        if (dstFile == NULL) {  /* could not open outFileName */
             error = 1;
         } else {
+            AIO_WritePool_setFile(ress.writeCtx, dstFile);
             for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) {
                 status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel);
                 if (!status) fCtx->nbFilesProcessed++;
                 error |= status;
             }
-            if (fclose(ress.dstFile))
+            if (AIO_WritePool_closeFile(ress.writeCtx))
                 EXM_THROW(29, "Write error (%s) : cannot properly close %s",
                             strerror(errno), outFileName);
-            ress.dstFile = NULL;
         }
     } else {
         if (outMirroredRootDirName)
@@ -1916,13 +1926,10 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
 /* **************************************************************************
  *  Decompression
  ***************************************************************************/
-
 typedef struct {
-    void*  srcBuffer;
-    size_t srcBufferSize;
-    size_t srcBufferLoaded;
     ZSTD_DStream* dctx;
     WritePoolCtx_t *writeCtx;
+    ReadPoolCtx_t *readCtx;
 } dRess_t;
 
 static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
@@ -1940,11 +1947,6 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
     CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
     CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));
 
-    ress.srcBufferSize = ZSTD_DStreamInSize();
-    ress.srcBuffer = malloc(ress.srcBufferSize);
-    if (!ress.srcBuffer)
-        EXM_THROW(61, "Allocation error : not enough memory");
-
     /* dictionary */
     {   void* dictBuffer;
         size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs);
@@ -1953,6 +1955,7 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
     }
 
     ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
+    ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_DStreamInSize());
 
     return ress;
 }
@@ -1960,47 +1963,31 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi
 static void FIO_freeDResources(dRess_t ress)
 {
     CHECK( ZSTD_freeDStream(ress.dctx) );
-    free(ress.srcBuffer);
     AIO_WritePool_free(ress.writeCtx);
-}
-
-/* FIO_consumeDSrcBuffer:
- * Consumes len bytes from srcBuffer's start and moves the remaining data and srcBufferLoaded accordingly. */
-static void FIO_consumeDSrcBuffer(dRess_t *ress, size_t len) {
-    assert(ress->srcBufferLoaded >= len);
-    ress->srcBufferLoaded -= len;
-    memmove(ress->srcBuffer, (char *)ress->srcBuffer + len, ress->srcBufferLoaded);
+    AIO_ReadPool_free(ress.readCtx);
 }
 
 /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode
     @return : 0 (no error) */
-static int FIO_passThrough(const FIO_prefs_t* const prefs,
-                           FILE* foutput, FILE* finput,
-                           void* buffer, size_t bufferSize,
-                           size_t alreadyLoaded)
+static int FIO_passThrough(dRess_t *ress)
 {
-    size_t const blockSize = MIN(64 KB, bufferSize);
-    size_t readFromInput;
-    unsigned storedSkips = 0;
-
-    /* assumption : ress->srcBufferLoaded bytes already loaded and stored within buffer */
-    {   size_t const sizeCheck = fwrite(buffer, 1, alreadyLoaded, foutput);
-        if (sizeCheck != alreadyLoaded) {
-            DISPLAYLEVEL(1, "Pass-through write error : %s\n", strerror(errno));
-            return 1;
-    }   }
-
-    do {
-        readFromInput = fread(buffer, 1, blockSize, finput);
-        storedSkips = AIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips);
-    } while (readFromInput == blockSize);
-    if (ferror(finput)) {
-        DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno));
-        return 1;
+    size_t const blockSize = MIN(MIN(64 KB, ZSTD_DStreamInSize()), ZSTD_DStreamOutSize());
+    IOJob_t *writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
+    AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
+
+    while(ress->readCtx->srcBufferLoaded) {
+        size_t writeSize;
+        writeSize = MIN(blockSize, ress->readCtx->srcBufferLoaded);
+        assert(writeSize <= writeJob->bufferSize);
+        memcpy(writeJob->buffer, ress->readCtx->srcBuffer, writeSize);
+        writeJob->usedBufferSize = writeSize;
+        AIO_WritePool_enqueueAndReacquireWriteJob(&writeJob);
+        AIO_ReadPool_consumeBytes(ress->readCtx, writeSize);
+        AIO_ReadPool_fillBuffer(ress->readCtx, blockSize);
     }
-    assert(feof(finput));
-
-    AIO_fwriteSparseEnd(prefs, foutput, storedSkips);
+    assert(ress->readCtx->reachedEof);
+    AIO_WritePool_releaseIoJob(writeJob);
+    AIO_WritePool_sparseWriteEnd(ress->writeCtx);
     return 0;
 }
 
@@ -2018,7 +2005,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
         return;
 
     /* Try to decode the frame header */
-    err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded);
+    err = ZSTD_getFrameHeader(&header, ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded);
     if (err == 0) {
         unsigned long long const windowSize = header.windowSize;
         unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0);
@@ -2041,7 +2028,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs,
  */
 #define FIO_ERROR_FRAME_DECODING   ((unsigned long long)(-2))
 static unsigned long long
-FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
+FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress,
                         const FIO_prefs_t* const prefs,
                         const char* srcFileName,
                         U64 alreadyDecoded)  /* for multi-frames streams */
@@ -2057,16 +2044,11 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
     ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only);
 
     /* Header loading : ensures ZSTD_getFrameHeader() will succeed */
-    {   size_t const toDecode = ZSTD_FRAMEHEADERSIZE_MAX;
-        if (ress->srcBufferLoaded < toDecode) {
-            size_t const toRead = toDecode - ress->srcBufferLoaded;
-            void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
-            ress->srcBufferLoaded += fread(startPosition, 1, toRead, finput);
-    }   }
+    AIO_ReadPool_fillBuffer(ress->readCtx, ZSTD_FRAMEHEADERSIZE_MAX);
 
     /* Main decompression Loop */
     while (1) {
-        ZSTD_inBuffer  inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 };
+        ZSTD_inBuffer  inBuff = { ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, 0 };
         ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 };
         size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff);
         const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2;
@@ -2088,7 +2070,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
             if (srcFileNameSize > 18) {
                 const char* truncatedSrcFileName = srcFileName + srcFileNameSize - 15;
                 DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: ...%s : %.*f%s...    ",
-                                fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
+                              fCtx->currFileIdx+1, fCtx->nbFilesTotal, truncatedSrcFileName, hrs.precision, hrs.value, hrs.suffix);
             } else {
                 DISPLAYUPDATE(displayLevel, "\rDecompress: %2u/%2u files. Current: %s : %.*f%s...    ",
                             fCtx->currFileIdx+1, fCtx->nbFilesTotal, srcFileName, hrs.precision, hrs.value, hrs.suffix);
@@ -2098,23 +2080,21 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
                             srcFileName, hrs.precision, hrs.value, hrs.suffix);
         }
 
-        FIO_consumeDSrcBuffer(ress, inBuff.pos);
+        AIO_ReadPool_consumeBytes(ress->readCtx, inBuff.pos);
 
         if (readSizeHint == 0) break;   /* end of frame */
 
         /* Fill input buffer */
-        {   size_t const toDecode = MIN(readSizeHint, ress->srcBufferSize);  /* support large skippable frames */
-            if (ress->srcBufferLoaded < toDecode) {
-                size_t const toRead = toDecode - ress->srcBufferLoaded;   /* > 0 */
-                void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded;
-                size_t const readSize = fread(startPosition, 1, toRead, finput);
+        {   size_t const toDecode = MIN(readSizeHint, ZSTD_DStreamInSize());  /* support large skippable frames */
+            if (ress->readCtx->srcBufferLoaded < toDecode) {
+                size_t const readSize = AIO_ReadPool_fillBuffer(ress->readCtx, toDecode);
                 if (readSize==0) {
                     DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n",
-                                    srcFileName);
+                                 srcFileName);
+                    AIO_WritePool_releaseIoJob(writeJob);
                     return FIO_ERROR_FRAME_DECODING;
                 }
-                ress->srcBufferLoaded += readSize;
-    }   }   }
+            }   }   }
 
     AIO_WritePool_releaseIoJob(writeJob);
     AIO_WritePool_sparseWriteEnd(ress->writeCtx);
@@ -2125,7 +2105,7 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput,
 
 #ifdef ZSTD_GZDECOMPRESS
 static unsigned long long
-FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
+FIO_decompressGzFrame(dRess_t* ress, const char* srcFileName)
 {
     unsigned long long outFileSize = 0;
     z_stream strm;
@@ -2145,16 +2125,16 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
     writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
     strm.next_out = (Bytef*)writeJob->buffer;
     strm.avail_out = (uInt)writeJob->bufferSize;
-    strm.avail_in = (uInt)ress->srcBufferLoaded;
-    strm.next_in = (z_const unsigned char*)ress->srcBuffer;
+    strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
+    strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
 
     for ( ; ; ) {
         int ret;
         if (strm.avail_in == 0) {
-            ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
-            if (ress->srcBufferLoaded == 0) flush = Z_FINISH;
-            strm.next_in = (z_const unsigned char*)ress->srcBuffer;
-            strm.avail_in = (uInt)ress->srcBufferLoaded;
+            AIO_ReadPool_consumeAndRefill(ress->readCtx);
+            if (ress->readCtx->srcBufferLoaded == 0) flush = Z_FINISH;
+            strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer;
+            strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded;
         }
         ret = inflate(&strm, flush);
         if (ret == Z_BUF_ERROR) {
@@ -2177,7 +2157,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
         if (ret == Z_STREAM_END) break;
     }
 
-    FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
+    AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
 
     if ( (inflateEnd(&strm) != Z_OK)  /* release resources ; error detected */
       && (decodingError==0) ) {
@@ -2192,7 +2172,7 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, const char* srcFileName)
 
 #ifdef ZSTD_LZMADECOMPRESS
 static unsigned long long
-FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
+FIO_decompressLzmaFrame(dRess_t* ress,
                         const char* srcFileName, int plain_lzma)
 {
     unsigned long long outFileSize = 0;
@@ -2220,16 +2200,16 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
     writeJob = AIO_WritePool_acquireJob(ress->writeCtx);
     strm.next_out = (Bytef*)writeJob->buffer;
     strm.avail_out = (uInt)writeJob->bufferSize;
-    strm.next_in = (BYTE const*)ress->srcBuffer;
-    strm.avail_in = ress->srcBufferLoaded;
+    strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
+    strm.avail_in = ress->readCtx->srcBufferLoaded;
 
     for ( ; ; ) {
         lzma_ret ret;
         if (strm.avail_in == 0) {
-            ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile);
-            if (ress->srcBufferLoaded == 0) action = LZMA_FINISH;
-            strm.next_in = (BYTE const*)ress->srcBuffer;
-            strm.avail_in = ress->srcBufferLoaded;
+            AIO_ReadPool_consumeAndRefill(ress->readCtx);
+            if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH;
+            strm.next_in = (BYTE const*)ress->readCtx->srcBuffer;
+            strm.avail_in = ress->readCtx->srcBufferLoaded;
         }
         ret = lzma_code(&strm, action);
 
@@ -2253,7 +2233,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
         if (ret == LZMA_STREAM_END) break;
     }
 
-    FIO_consumeDSrcBuffer(ress, ress->srcBufferLoaded - strm.avail_in);
+    AIO_ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in);
     lzma_end(&strm);
     AIO_WritePool_releaseIoJob(writeJob);
     AIO_WritePool_sparseWriteEnd(ress->writeCtx);
@@ -2263,8 +2243,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile,
 
 #ifdef ZSTD_LZ4DECOMPRESS
 static unsigned long long
-FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
-                       const char* srcFileName)
+FIO_decompressLz4Frame(dRess_t* ress, const char* srcFileName)
 {
     unsigned long long filesize = 0;
     LZ4F_errorCode_t nextToLoad = 4;
@@ -2282,34 +2261,27 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
 
     /* Main Loop */
     for (;nextToLoad;) {
-        size_t readSize;
         size_t pos = 0;
         size_t decodedBytes = writeJob->bufferSize;
         int fullBufferDecoded = 0;
 
         /* Read input */
-        nextToLoad = MIN(nextToLoad, ress->srcBufferSize-ress->srcBufferLoaded);
-        readSize = fread((char *)ress->srcBuffer + ress->srcBufferLoaded, 1, nextToLoad, srcFile);
-        if(!readSize && ferror(srcFile)) {
-            DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName);
-            decodingError=1;
-            break;
-        }
-        if(!readSize && !ress->srcBufferLoaded) break; /* reached end of file */
-        ress->srcBufferLoaded += readSize;
+        AIO_ReadPool_fillBuffer(ress->readCtx, nextToLoad);
+        if(!ress->readCtx->srcBufferLoaded) break; /* reached end of file */
 
-        while ((pos < ress->srcBufferLoaded) || fullBufferDecoded) {  /* still to read, or still to flush */
+        while ((pos < ress->readCtx->srcBufferLoaded) || fullBufferDecoded) {  /* still to read, or still to flush */
             /* Decode Input (at least partially) */
-            size_t remaining = ress->srcBufferLoaded - pos;
+            size_t remaining = ress->readCtx->srcBufferLoaded - pos;
             decodedBytes = writeJob->bufferSize;
-            nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL);
+            nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->readCtx->srcBuffer)+pos,
+                                         &remaining, NULL);
             if (LZ4F_isError(nextToLoad)) {
                 DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n",
                                 srcFileName, LZ4F_getErrorName(nextToLoad));
                 decodingError = 1; nextToLoad = 0; break;
             }
             pos += remaining;
-            assert(pos <= ress->srcBufferLoaded);
+            assert(pos <= ress->readCtx->srcBufferLoaded);
             fullBufferDecoded = decodedBytes == writeJob->bufferSize;
 
             /* Write Block */
@@ -2324,7 +2296,7 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
 
             if (!nextToLoad) break;
         }
-        FIO_consumeDSrcBuffer(ress, pos);
+        AIO_ReadPool_consumeBytes(ress->readCtx, pos);
     }
     if (nextToLoad!=0) {
         DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName);
@@ -2348,23 +2320,20 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile,
  *           1 : error
  */
 static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
-                          dRess_t ress, FILE* srcFile,
-                          const FIO_prefs_t* const prefs,
-                          const char* dstFileName, const char* srcFileName)
+                                dRess_t ress, const FIO_prefs_t* const prefs,
+                                const char* dstFileName, const char* srcFileName)
 {
     unsigned readSomething = 0;
     unsigned long long filesize = 0;
-    assert(srcFile != NULL);
 
     /* for each frame */
     for ( ; ; ) {
         /* check magic number -> version */
         size_t const toRead = 4;
-        const BYTE* const buf = (const BYTE*)ress.srcBuffer;
-        if (ress.srcBufferLoaded < toRead)  /* load up to 4 bytes for header */
-            ress.srcBufferLoaded += fread((char*)ress.srcBuffer + ress.srcBufferLoaded,
-                                          (size_t)1, toRead - ress.srcBufferLoaded, srcFile);
-        if (ress.srcBufferLoaded==0) {
+        const BYTE* buf;
+        AIO_ReadPool_fillBuffer(ress.readCtx, toRead);
+        buf = (const BYTE*)ress.readCtx->srcBuffer;
+        if (ress.readCtx->srcBufferLoaded==0) {
             if (readSomething==0) {  /* srcFile is empty (which is invalid) */
                 DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName);
                 return 1;
@@ -2372,17 +2341,17 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
             break;   /* no more input */
         }
         readSomething = 1;   /* there is at least 1 byte in srcFile */
-        if (ress.srcBufferLoaded < toRead) {
+        if (ress.readCtx->srcBufferLoaded < toRead) {
             DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName);
             return 1;
         }
-        if (ZSTD_isFrame(buf, ress.srcBufferLoaded)) {
-            unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, srcFile, prefs, srcFileName, filesize);
+        if (ZSTD_isFrame(buf, ress.readCtx->srcBufferLoaded)) {
+            unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, prefs, srcFileName, filesize);
             if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
             filesize += frameSize;
         } else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */
 #ifdef ZSTD_GZDECOMPRESS
-            unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, srcFileName);
+            unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFileName);
             if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
             filesize += frameSize;
 #else
@@ -2392,7 +2361,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
         } else if ((buf[0] == 0xFD && buf[1] == 0x37)  /* xz magic number */
                 || (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */
 #ifdef ZSTD_LZMADECOMPRESS
-            unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, srcFileName, buf[0] != 0xFD);
+            unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFileName, buf[0] != 0xFD);
             if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
             filesize += frameSize;
 #else
@@ -2401,7 +2370,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
 #endif
         } else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) {
 #ifdef ZSTD_LZ4DECOMPRESS
-            unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, srcFileName);
+            unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFileName);
             if (frameSize == FIO_ERROR_FRAME_DECODING) return 1;
             filesize += frameSize;
 #else
@@ -2409,10 +2378,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
             return 1;
 #endif
         } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) {  /* pass-through mode */
-            return FIO_passThrough(prefs,
-                                   AIO_WritePool_getFile(ress.writeCtx), srcFile,
-                                   ress.srcBuffer, ress.srcBufferSize,
-                                   ress.srcBufferLoaded);
+            return FIO_passThrough(&ress);
         } else {
             DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName);
             return 1;
@@ -2432,15 +2398,14 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx,
 }
 
 /** FIO_decompressDstFile() :
-    open `dstFileName`,
-    or path-through if ress.dstFile is already != 0,
+    open `dstFileName`, or pass-through if writeCtx's file is already != 0,
     then start decompression process (FIO_decompressFrames()).
     @return : 0 : OK
               1 : operation aborted
 */
 static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
                                  FIO_prefs_t* const prefs,
-                                 dRess_t ress, FILE* srcFile,
+                                 dRess_t ress,
                                  const char* dstFileName, const char* srcFileName)
 {
     int result;
@@ -2472,7 +2437,7 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx,
         addHandler(dstFileName);
     }
 
-    result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName);
+    result = FIO_decompressFrames(fCtx, ress, prefs, dstFileName, srcFileName);
 
     if (releaseDstFile) {
         clearHandler();
@@ -2513,9 +2478,11 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs
 
     srcFile = FIO_openSrcFile(prefs, srcFileName);
     if (srcFile==NULL) return 1;
-    ress.srcBufferLoaded = 0;
+    AIO_ReadPool_setFile(ress.readCtx, srcFile);
+
+    result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName);
 
-    result = FIO_decompressDstFile(fCtx, prefs, ress, srcFile, dstFileName, srcFileName);
+    AIO_ReadPool_setFile(ress.readCtx, NULL);
 
     /* Close file */
     if (fclose(srcFile)) {
index 868720a1da2b28f06cb3f8f8695fef3154f219b1..332292bbc1b39089461905da7986aa5a11c33c89 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) Yann Collet, Facebook, Inc.
+ * Copyright (c) Facebook, Inc.
  * All rights reserved.
  *
  * This source code is licensed under both the BSD-style license (found in the
@@ -29,7 +29,8 @@
 /** AIO_fwriteSparse() :
 *  @return : storedSkips,
 *            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
-unsigned AIO_fwriteSparse(FILE* file,
+static unsigned
+AIO_fwriteSparse(FILE* file,
                  const void* buffer, size_t bufferSize,
                  const FIO_prefs_t* const prefs,
                  unsigned storedSkips)
@@ -45,7 +46,7 @@ unsigned AIO_fwriteSparse(FILE* file,
     if (!prefs->sparseFileSupport) {  /* normal write */
         size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file);
         if (sizeCheck != bufferSize)
-            EXM_THROW(70, "Write error : cannot write decoded block : %s",
+            EXM_THROW(70, "Write error : cannot write block : %s",
                       strerror(errno));
         return 0;
     }
@@ -77,7 +78,7 @@ unsigned AIO_fwriteSparse(FILE* file,
             storedSkips = 0;
             /* write the rest */
             if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST)
-                EXM_THROW(93, "Write error : cannot write decoded block : %s",
+                EXM_THROW(93, "Write error : cannot write block : %s",
                           strerror(errno));
         }
         ptrT += seg0SizeT;
@@ -106,7 +107,8 @@ unsigned AIO_fwriteSparse(FILE* file,
     return storedSkips;
 }
 
-void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
+static void
+AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips)
 {
     if (prefs->testMode) assert(storedSkips == 0);
     if (storedSkips>0) {
@@ -127,17 +129,25 @@ void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned st
  *  AsyncIO functionality
  ************************************************************************/
 
+/* AIO_supported:
+ * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
+int AIO_supported(void) {
+#ifdef ZSTD_MULTITHREAD
+    return 1;
+#else
+    return 0;
+#endif
+}
+
 /* ***********************************
  *  General IoPool implementation
  *************************************/
 
 static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
-    void *buffer;
-    IOJob_t *job;
-    job = (IOJob_t*) malloc(sizeof(IOJob_t));
-    buffer = malloc(bufferSize);
+    IOJob_t* const job  = (IOJob_t*) malloc(sizeof(IOJob_t));
+    void* const buffer = malloc(bufferSize);
     if(!job || !buffer)
-    EXM_THROW(101, "Allocation error : not enough memory");
+        EXM_THROW(101, "Allocation error : not enough memory");
     job->buffer = buffer;
     job->bufferSize = bufferSize;
     job->usedBufferSize = 0;
@@ -151,49 +161,47 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
 /* AIO_IOPool_createThreadPool:
  * Creates a thread pool and a mutex for threaded IO pool.
  * Displays warning if asyncio is requested but MT isn't available. */
-static void AIO_IOPool_createThreadPool(IOPoolCtx_t *ctx, const FIO_prefs_t *prefs) {
+static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
     ctx->threadPool = NULL;
     if(prefs->asyncIO) {
         if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
-        EXM_THROW(102,"Failed creating write availableJobs mutex");
+            EXM_THROW(102,"Failed creating write availableJobs mutex");
         /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
          * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
         assert(MAX_IO_JOBS >= 2);
         ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
         if (!ctx->threadPool)
-        EXM_THROW(104, "Failed creating writer thread pool");
+            EXM_THROW(104, "Failed creating writer thread pool");
     }
 }
 
 /* AIO_IOPool_init:
  * Allocates and sets and a new write pool including its included availableJobs. */
-static void AIO_IOPool_init(IOPoolCtx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) {
+static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
     int i;
     AIO_IOPool_createThreadPool(ctx, prefs);
     ctx->prefs = prefs;
     ctx->poolFunction = poolFunction;
-    ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1;
+    ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 2;
     ctx->availableJobsCount = ctx->totalIoJobs;
     for(i=0; i < ctx->availableJobsCount; i++) {
         ctx->availableJobs[i] = AIO_IOPool_createIoJob(ctx, bufferSize);
     }
+    ctx->jobBufferSize = bufferSize;
     ctx->file = NULL;
 }
 
 
 /* AIO_IOPool_releaseIoJob:
  * Releases an acquired job back to the pool. Doesn't execute the job. */
-static void AIO_IOPool_releaseIoJob(IOJob_t *job) {
-    IOPoolCtx_t *ctx = (IOPoolCtx_t *) job->ctx;
-    if(ctx->threadPool) {
+static void AIO_IOPool_releaseIoJob(IOJob_tjob) {
+    IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
+    if(ctx->threadPool)
         ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
-        assert(ctx->availableJobsCount < MAX_IO_JOBS);
-        ctx->availableJobs[ctx->availableJobsCount++] = job;
+    assert(ctx->availableJobsCount < ctx->totalIoJobs);
+    ctx->availableJobs[ctx->availableJobsCount++] = job;
+    if(ctx->threadPool)
         ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
-    } else {
-        assert(ctx->availableJobsCount == 0);
-        ctx->availableJobsCount++;
-    }
 }
 
 /* AIO_IOPool_join:
@@ -225,19 +233,15 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
 
 /* AIO_IOPool_acquireJob:
  * Returns an available io job to be used for a future io. */
-static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
+static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_tctx) {
     IOJob_t *job;
     assert(ctx->file != NULL || ctx->prefs->testMode);
-    if(ctx->threadPool) {
+    if(ctx->threadPool)
         ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
-        assert(ctx->availableJobsCount > 0);
-        job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
+    assert(ctx->availableJobsCount > 0);
+    job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
+    if(ctx->threadPool)
         ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
-    } else {
-        assert(ctx->availableJobsCount == 1);
-        ctx->availableJobsCount--;
-        job = (IOJob_t*)ctx->availableJobs[0];
-    }
     job->usedBufferSize = 0;
     job->file = ctx->file;
     job->offset = 0;
@@ -249,22 +253,22 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t *ctx) {
  * Sets the destination file for future files in the pool.
  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
  * Also requires ending of sparse write if a previous file was used in sparse mode. */
-static void AIO_IOPool_setFile(IOPoolCtx_t *ctx, FILE* file) {
+static void AIO_IOPool_setFile(IOPoolCtx_tctx, FILE* file) {
     assert(ctx!=NULL);
     AIO_IOPool_join(ctx);
     assert(ctx->availableJobsCount == ctx->totalIoJobs);
     ctx->file = file;
 }
 
-static FILE* AIO_IOPool_getFile(IOPoolCtx_t *ctx) {
+static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
     return ctx->file;
 }
 
 /* AIO_IOPool_enqueueJob:
  * Enqueues an io job for execution.
  * The queued job shouldn't be used directly after queueing it. */
-static void AIO_IOPool_enqueueJob(IOJob_t *job) {
-    IOPoolCtx_t* ctx = (IOPoolCtx_t *)job->ctx;
+static void AIO_IOPool_enqueueJob(IOJob_tjob) {
+    IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
     if(ctx->threadPool)
         POOL_add(ctx->threadPool, ctx->poolFunction, job);
     else
@@ -277,7 +281,7 @@ static void AIO_IOPool_enqueueJob(IOJob_t *job) {
 
 /* AIO_WritePool_acquireJob:
  * Returns an available write job to be used for a future write. */
-IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_t *ctx) {
+IOJob_t* AIO_WritePool_acquireJob(WritePoolCtx_tctx) {
     return AIO_IOPool_acquireJob(&ctx->base);
 }
 
@@ -294,7 +298,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
 /* AIO_WritePool_sparseWriteEnd:
  * Ends sparse writes to the current file.
  * Blocks on completion of all current write jobs before executing. */
-void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
+void AIO_WritePool_sparseWriteEnd(WritePoolCtx_tctx) {
     assert(ctx != NULL);
     if(ctx->base.threadPool)
         POOL_joinJobs(ctx->base.threadPool);
@@ -306,28 +310,28 @@ void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t *ctx) {
  * Sets the destination file for future writes in the pool.
  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.
  * Also requires ending of sparse write if a previous file was used in sparse mode. */
-void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file) {
+void AIO_WritePool_setFile(WritePoolCtx_tctx, FILE* file) {
     AIO_IOPool_setFile(&ctx->base, file);
     assert(ctx->storedSkips == 0);
 }
 
 /* AIO_WritePool_getFile:
  * Returns the file the writePool is currently set to write to. */
-FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx) {
+FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx) {
     return AIO_IOPool_getFile(&ctx->base);
 }
 
 /* AIO_WritePool_releaseIoJob:
  * Releases an acquired job back to the pool. Doesn't execute the job. */
-void AIO_WritePool_releaseIoJob(IOJob_t *job) {
+void AIO_WritePool_releaseIoJob(IOJob_tjob) {
     AIO_IOPool_releaseIoJob(job);
 }
 
 /* AIO_WritePool_closeFile:
  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
  * Requires completion of all queues write jobs and release of all otherwise acquired jobs.  */
-int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
-    FILE *dstFile = ctx->base.file;
+int AIO_WritePool_closeFile(WritePoolCtx_tctx) {
+    FILE* const dstFile = ctx->base.file;
     assert(dstFile!=NULL || ctx->base.prefs->testMode!=0);
     AIO_WritePool_sparseWriteEnd(ctx);
     AIO_IOPool_setFile(&ctx->base, NULL);
@@ -337,16 +341,16 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx) {
 /* AIO_WritePool_executeWriteJob:
  * Executes a write job synchronously. Can be used as a function for a thread pool. */
 static void AIO_WritePool_executeWriteJob(void* opaque){
-    IOJob_t* job = (IOJob_t*) opaque;
-    WritePoolCtx_t* ctx = (WritePoolCtx_t*) job->ctx;
+    IOJob_t* const job = (IOJob_t*) opaque;
+    WritePoolCtx_t* const ctx = (WritePoolCtx_t*) job->ctx;
     ctx->storedSkips = AIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips);
     AIO_IOPool_releaseIoJob(job);
 }
 
 /* AIO_WritePool_create:
  * Allocates and sets and a new write pool including its included jobs. */
-WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) {
-    WritePoolCtx_t* ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
+WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
+    WritePoolCtx_t* const ctx = (WritePoolCtx_t*) malloc(sizeof(WritePoolCtx_t));
     if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
     AIO_IOPool_init(&ctx->base, prefs, AIO_WritePool_executeWriteJob, bufferSize);
     ctx->storedSkips = 0;
@@ -363,3 +367,256 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
     assert(ctx->storedSkips==0);
     free(ctx);
 }
+
+
+/* ***********************************
+ *  ReadPool implementation
+ *************************************/
+static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {
+    int i;
+    for(i=0; i<ctx->completedJobsCount; i++) {
+        IOJob_t* job = (IOJob_t*) ctx->completedJobs[i];
+        AIO_IOPool_releaseIoJob(job);
+    }
+    ctx->completedJobsCount = 0;
+}
+
+static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
+    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
+    if(ctx->base.threadPool)
+        ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+    assert(ctx->completedJobsCount < MAX_IO_JOBS);
+    ctx->completedJobs[ctx->completedJobsCount++] = job;
+    if(ctx->base.threadPool) {
+        ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
+        ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
+    }
+}
+
+/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
+ * Looks through the completed jobs for a job matching the waitingOnOffset and returns it,
+ * if job wasn't found returns NULL.
+ * IMPORTANT: assumes ioJobsMutex is locked. */
+static IOJob_t* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ReadPoolCtx_t* ctx) {
+    IOJob_t *job = NULL;
+    int i;
+    /* This implementation goes through all completed jobs and looks for the one matching the next offset.
+     * While not strictly needed for a single threaded reader implementation (as in such a case we could expect
+     * reads to be completed in order) this implementation was chosen as it better fits other asyncio
+     * interfaces (such as io_uring) that do not provide promises regarding order of completion. */
+    for (i=0; i<ctx->completedJobsCount; i++) {
+        job = (IOJob_t *) ctx->completedJobs[i];
+        if (job->offset == ctx->waitingOnOffset) {
+            ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount];
+            return job;
+        }
+    }
+    return NULL;
+}
+
+/* AIO_ReadPool_numReadsInFlight:
+ * Returns the number of IO read jobs currrently in flight. */
+static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
+    const size_t jobsHeld = (ctx->currentJobHeld==NULL ? 0 : 1);
+    return ctx->base.totalIoJobs - (ctx->base.availableJobsCount + ctx->completedJobsCount + jobsHeld);
+}
+
+/* AIO_ReadPool_getNextCompletedJob:
+ * Returns a completed IOJob_t for the next read in line based on waitingOnOffset and advances waitingOnOffset.
+ * Would block. */
+static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
+    IOJob_t *job = NULL;
+    if (ctx->base.threadPool)
+        ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
+
+    job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
+
+    /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */
+    while (!job && (AIO_ReadPool_numReadsInFlight(ctx) > 0)) {
+        assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */
+        ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex);
+        job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);
+    }
+
+    if(job) {
+        assert(job->offset == ctx->waitingOnOffset);
+        ctx->waitingOnOffset += job->usedBufferSize;
+    }
+
+    if (ctx->base.threadPool)
+        ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
+    return job;
+}
+
+
+/* AIO_ReadPool_executeReadJob:
+ * Executes a read job synchronously. Can be used as a function for a thread pool. */
+static void AIO_ReadPool_executeReadJob(void* opaque){
+    IOJob_t* const job = (IOJob_t*) opaque;
+    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
+    if(ctx->reachedEof) {
+        job->usedBufferSize = 0;
+        AIO_ReadPool_addJobToCompleted(job);
+        return;
+    }
+    job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file);
+    if(job->usedBufferSize < job->bufferSize) {
+        if(ferror(job->file)) {
+            EXM_THROW(37, "Read error");
+        } else if(feof(job->file)) {
+            ctx->reachedEof = 1;
+        } else {
+            EXM_THROW(37, "Unexpected short read");
+        }
+    }
+    AIO_ReadPool_addJobToCompleted(job);
+}
+
+static void AIO_ReadPool_enqueueRead(ReadPoolCtx_t* ctx) {
+    IOJob_t* const job = AIO_IOPool_acquireJob(&ctx->base);
+    job->offset = ctx->nextReadOffset;
+    ctx->nextReadOffset += job->bufferSize;
+    AIO_IOPool_enqueueJob(job);
+}
+
+static void AIO_ReadPool_startReading(ReadPoolCtx_t* ctx) {
+    int i;
+    for (i = 0; i < ctx->base.availableJobsCount; i++) {
+        AIO_ReadPool_enqueueRead(ctx);
+    }
+}
+
+/* AIO_ReadPool_setFile:
+ * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
+ * Waits for all current enqueued tasks to complete if a previous file was set. */
+void AIO_ReadPool_setFile(ReadPoolCtx_t* ctx, FILE* file) {
+    assert(ctx!=NULL);
+    AIO_IOPool_join(&ctx->base);
+    AIO_ReadPool_releaseAllCompletedJobs(ctx);
+    if (ctx->currentJobHeld) {
+        AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
+        ctx->currentJobHeld = NULL;
+    }
+    AIO_IOPool_setFile(&ctx->base, file);
+    ctx->nextReadOffset = 0;
+    ctx->waitingOnOffset = 0;
+    ctx->srcBuffer = ctx->coalesceBuffer;
+    ctx->srcBufferLoaded = 0;
+    ctx->reachedEof = 0;
+    if(file != NULL)
+        AIO_ReadPool_startReading(ctx);
+}
+
+/* AIO_ReadPool_create:
+ * Allocates and sets and a new readPool including its included jobs.
+ * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
+ * as our basic read size. */
+ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize) {
+    ReadPoolCtx_t* const ctx = (ReadPoolCtx_t*) malloc(sizeof(ReadPoolCtx_t));
+    if(!ctx) EXM_THROW(100, "Allocation error : not enough memory");
+    AIO_IOPool_init(&ctx->base, prefs, AIO_ReadPool_executeReadJob, bufferSize);
+
+    ctx->coalesceBuffer = (U8*) malloc(bufferSize * 2);
+    ctx->srcBuffer = ctx->coalesceBuffer;
+    ctx->srcBufferLoaded = 0;
+    ctx->completedJobsCount = 0;
+    ctx->currentJobHeld = NULL;
+
+    if(ctx->base.threadPool)
+        if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
+            EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
+
+    return ctx;
+}
+
+/* AIO_ReadPool_free:
+ * Frees and releases a readPool and its resources. Closes source file. */
+void AIO_ReadPool_free(ReadPoolCtx_t* ctx) {
+    if(AIO_ReadPool_getFile(ctx))
+        AIO_ReadPool_closeFile(ctx);
+    if(ctx->base.threadPool)
+        ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond);
+    AIO_IOPool_destroy(&ctx->base);
+    free(ctx->coalesceBuffer);
+    free(ctx);
+}
+
+/* AIO_ReadPool_consumeBytes:
+ * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
+void AIO_ReadPool_consumeBytes(ReadPoolCtx_t* ctx, size_t n) {
+    assert(n <= ctx->srcBufferLoaded);
+    ctx->srcBufferLoaded -= n;
+    ctx->srcBuffer += n;
+}
+
+/* AIO_ReadPool_releaseCurrentlyHeldAndGetNext:
+ * Release the current held job and get the next one, returns NULL if no next job available. */
+static IOJob_t* AIO_ReadPool_releaseCurrentHeldAndGetNext(ReadPoolCtx_t* ctx) {
+    if (ctx->currentJobHeld) {
+        AIO_IOPool_releaseIoJob((IOJob_t *)ctx->currentJobHeld);
+        ctx->currentJobHeld = NULL;
+        AIO_ReadPool_enqueueRead(ctx);
+    }
+    ctx->currentJobHeld = AIO_ReadPool_getNextCompletedJob(ctx);
+    return (IOJob_t*) ctx->currentJobHeld;
+}
+
+/* AIO_ReadPool_fillBuffer:
+ * Tries to fill the buffer with at least n or jobBufferSize bytes (whichever is smaller).
+ * Returns if srcBuffer has at least the expected number of bytes loaded or if we've reached the end of the file.
+ * Return value is the number of bytes added to the buffer.
+ * Note that srcBuffer might have up to 2 times jobBufferSize bytes. */
+size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t* ctx, size_t n) {
+    IOJob_t *job;
+    int useCoalesce = 0;
+    if(n > ctx->base.jobBufferSize)
+        n = ctx->base.jobBufferSize;
+
+    /* We are good, don't read anything */
+    if (ctx->srcBufferLoaded >= n)
+        return 0;
+
+    /* We still have bytes loaded, but not enough to satisfy caller. We need to get the next job
+     * and coalesce the remaining bytes with the next job's buffer */
+    if (ctx->srcBufferLoaded > 0) {
+        useCoalesce = 1;
+        memcpy(ctx->coalesceBuffer, ctx->srcBuffer, ctx->srcBufferLoaded);
+        ctx->srcBuffer = ctx->coalesceBuffer;
+    }
+
+    /* Read the next chunk */
+    job = AIO_ReadPool_releaseCurrentHeldAndGetNext(ctx);
+    if(!job)
+        return 0;
+    if(useCoalesce) {
+        assert(ctx->srcBufferLoaded + job->usedBufferSize <= 2*ctx->base.jobBufferSize);
+        memcpy(ctx->coalesceBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize);
+        ctx->srcBufferLoaded += job->usedBufferSize;
+    }
+    else {
+        ctx->srcBuffer = (U8 *) job->buffer;
+        ctx->srcBufferLoaded = job->usedBufferSize;
+    }
+    return job->usedBufferSize;
+}
+
+/* AIO_ReadPool_consumeAndRefill:
+ * Consumes the current buffer and refills it with bufferSize bytes. */
+size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t* ctx) {
+    AIO_ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded);
+    return AIO_ReadPool_fillBuffer(ctx, ctx->base.jobBufferSize);
+}
+
+/* AIO_ReadPool_getFile:
+ * Returns the current file set for the read pool. */
+FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t* ctx) {
+    return AIO_IOPool_getFile(&ctx->base);
+}
+
+/* AIO_ReadPool_closeFile:
+ * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
+int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
+    FILE* const file = AIO_ReadPool_getFile(ctx);
+    AIO_ReadPool_setFile(ctx, NULL);
+    return fclose(file);
+}
index 3e91164c5581c5b508075b778fd852b4fa1702ab..bf07f85947ba533e3655d04c6917e8590b318eb9 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) Yann Collet, Facebook, Inc.
+ * Copyright (c) Facebook, Inc.
  * All rights reserved.
  *
  * This source code is licensed under both the BSD-style license (found in the
@@ -28,7 +28,7 @@ typedef struct {
     /* These struct fields should be set only on creation and not changed afterwards */
     POOL_ctx* threadPool;
     int totalIoJobs;
-    FIO_prefs_t* prefs;
+    const FIO_prefs_t* prefs;
     POOL_function poolFunction;
 
     /* Controls the file we currently write to, make changes only by using provided utility functions */
@@ -39,8 +39,36 @@ typedef struct {
     ZSTD_pthread_mutex_t ioJobsMutex;
     void* availableJobs[MAX_IO_JOBS];
     int availableJobsCount;
+    size_t jobBufferSize;
 } IOPoolCtx_t;
 
+typedef struct {
+    IOPoolCtx_t base;
+
+    /* State regarding the currently read file */
+    int reachedEof;
+    U64 nextReadOffset;
+    U64 waitingOnOffset;
+
+    /* We may hold an IOJob object as needed if we actively expose its buffer. */
+    void *currentJobHeld;
+
+    /* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
+     * the first of them. Shouldn't be accessed from outside ot utility functions. */
+    U8 *coalesceBuffer;
+
+    /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
+     * change when consuming / refilling buffer. */
+    U8 *srcBuffer;
+    size_t srcBufferLoaded;
+
+    /* We need to know what tasks completed so we can use their buffers when their time comes.
+     * Should only be accessed after locking base.ioJobsMutex . */
+    void* completedJobs[MAX_IO_JOBS];
+    int completedJobsCount;
+    ZSTD_pthread_cond_t jobCompletedCond;
+} ReadPoolCtx_t;
+
 typedef struct {
     IOPoolCtx_t base;
     unsigned storedSkips;
@@ -59,15 +87,10 @@ typedef struct {
     U64 offset;
 } IOJob_t;
 
-/** AIO_fwriteSparse() :
-*  @return : storedSkips,
-*            argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
-unsigned AIO_fwriteSparse(FILE* file,
-                 const void* buffer, size_t bufferSize,
-                 const FIO_prefs_t* const prefs,
-                 unsigned storedSkips);
+/* AIO_supported:
+ * Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
+int AIO_supported(void);
 
-void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);
 
 /* AIO_WritePool_releaseIoJob:
  * Releases an acquired job back to the pool. Doesn't execute the job. */
@@ -97,7 +120,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);
 
 /* AIO_WritePool_getFile:
  * Returns the file the writePool is currently set to write to. */
-FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
+FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);
 
 /* AIO_WritePool_closeFile:
  * Ends sparse write and closes the writePool's current file and sets the file to NULL.
@@ -107,12 +130,50 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
 /* AIO_WritePool_create:
  * Allocates and sets and a new write pool including its included jobs.
  * bufferSize should be set to the maximal buffer we want to write to at a time. */
-WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize);
+WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);
 
 /* AIO_WritePool_free:
  * Frees and releases a writePool and its resources. Closes destination file. */
 void AIO_WritePool_free(WritePoolCtx_t* ctx);
 
+/* AIO_ReadPool_create:
+ * Allocates and sets and a new readPool including its included jobs.
+ * bufferSize should be set to the maximal buffer we want to read at a time, will also be used
+ * as our basic read size. */
+ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
+
+/* AIO_ReadPool_free:
+ * Frees and releases a readPool and its resources. Closes source file. */
+void AIO_ReadPool_free(ReadPoolCtx_t* ctx);
+
+/* AIO_ReadPool_consumeBytes:
+ * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
+void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
+
+/* AIO_ReadPool_fillBuffer:
+ * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize).
+ * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
+ * Return value is the number of bytes added to the buffer.
+ * Note that srcBuffer might have up to 2 times bufferSize bytes. */
+size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);
+
+/* AIO_ReadPool_consumeAndRefill:
+ * Consumes the current buffer and refills it with bufferSize bytes. */
+size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);
+
+/* AIO_ReadPool_setFile:
+ * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
+ * Waits for all current enqueued tasks to complete if a previous file was set. */
+void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);
+
+/* AIO_ReadPool_getFile:
+ * Returns the current file set for the read pool. */
+FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);
+
+/* AIO_ReadPool_closeFile:
+ * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
+int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);
+
 #if defined (__cplusplus)
 }
 #endif
index d33c19d7bd1e4e7ea56216c6ca1914a633a36002..282c2f13b4d507d16e163f0eaff1d01c3eb2cd48 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) Yann Collet, Facebook, Inc.
+ * Copyright (c) Facebook, Inc.
  * All rights reserved.
  *
  * This source code is licensed under both the BSD-style license (found in the
index 1909ab1ab5ad89fae6e5f9217d39313abe262809..cf566aa206303bc5e3db9e53b218b9b754c305af 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) Yann Collet, Facebook, Inc.
+ * Copyright (c) Facebook, Inc.
  * All rights reserved.
  *
  * This source code is licensed under both the BSD-style license (found in the
@@ -70,4 +70,4 @@ typedef struct FIO_prefs_s {
     int allowBlockDevices;
 } FIO_prefs_t;
 
-#endif /* FILEIO_TYPES_HEADER */
\ No newline at end of file
+#endif /* FILEIO_TYPES_HEADER */
index 2e3f4ddb7b076bb483c64701b5eed4f6898a94c3..29da261dfbd8caf703fa5c9d9e66f7930ce4967b 100644 (file)
@@ -46,6 +46,7 @@
 #  include "zstdcli_trace.h"
 #endif
 #include "../lib/zstd.h"  /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */
+#include "fileio_asyncio.h"
 
 
 /*-************************************
@@ -179,7 +180,8 @@ static void usage_advanced(const char* programName)
 #ifdef UTIL_HAS_MIRRORFILELIST
     DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n");
 #endif
-
+    if (AIO_supported())
+        DISPLAYOUT( "--[no-]asyncio : use asynchronous IO (default: enabled) \n");
 
 #ifndef ZSTD_NOCOMPRESS
     DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)");
@@ -242,9 +244,6 @@ static void usage_advanced(const char* programName)
     DISPLAYOUT( " -l        : print information about zstd compressed files \n");
     DISPLAYOUT( "--test     : test compressed file integrity \n");
     DISPLAYOUT( " -M#       : Set a memory usage limit for decompression \n");
-#ifdef ZSTD_MULTITHREAD
-    DISPLAYOUT( "--[no-]asyncio  : use threaded asynchronous IO for output (default: disabled) \n");
-#endif
 # if ZSTD_SPARSE_DEFAULT
     DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
 # else
@@ -1459,6 +1458,7 @@ int main(int argCount, const char* argv[])
         FIO_setTargetCBlockSize(prefs, targetCBlockSize);
         FIO_setSrcSizeHint(prefs, srcSizeHint);
         FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
+        FIO_setSparseWrite(prefs, 0);
         if (adaptMin > cLevel) cLevel = adaptMin;
         if (adaptMax < cLevel) cLevel = adaptMax;
 
index f04f1da72525ab8aa747b670ae7dac4e74a61a69..f97e96e374a47f8dae0e32aef3f7a6137c61be7d 100755 (executable)
@@ -260,10 +260,13 @@ zstd -dc - < tmp.zst > $INTOVOID
 zstd -d    < tmp.zst > $INTOVOID   # implicit stdout when stdin is used
 zstd -d  - < tmp.zst > $INTOVOID
 println "test : impose memory limitation (must fail)"
-zstd -d -f tmp.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
-zstd -d -f tmp.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed"  # long command
-zstd -d -f tmp.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed"  # long command
-zstd -d -f tmp.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed"  # long command
+datagen -g500K > tmplimit
+zstd -f tmplimit
+zstd -d -f tmplimit.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
+zstd -d -f tmplimit.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed"  # long command
+zstd -d -f tmplimit.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed"  # long command
+zstd -d -f tmplimit.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed"  # long command
+rm -f tmplimit tmplimit.zst
 println "test : overwrite protection"
 zstd -q tmp && die "overwrite check failed!"
 println "test : force overwrite"
@@ -1596,11 +1599,11 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
     exit 1
 fi
 
-println "\n===>  zstd asyncio decompression tests "
+println "\n===>  zstd asyncio tests "
 
 addFrame() {
     datagen -g2M -s$2 >> tmp_uncompressed
-    datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst
+    datagen -g2M -s$2 | zstd -1 --format=$1 >> tmp_compressed.zst
 }
 
 addTwoFrames() {