]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstd cli can increase level when input is too slow
authorYann Collet <cyan@fb.com>
Thu, 9 Aug 2018 22:51:30 +0000 (15:51 -0700)
committerYann Collet <cyan@fb.com>
Thu, 9 Aug 2018 22:51:30 +0000 (15:51 -0700)
lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
lib/zstd.h
programs/fileio.c

index ed3aab871b9e4ca5e58c10b11d7d7f2e5afd83d8..2121fe74989ed33e1088708515c4ec9a4c2855f5 100644 (file)
@@ -900,6 +900,7 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
         fp.ingested = cctx->consumedSrcSize + buffered;
         fp.consumed = cctx->consumedSrcSize;
         fp.produced = cctx->producedCSize;
+        fp.currentJobID = 0;
         return fp;
 }   }
 
index 74f9dc29c23ca5ef60dd6ecab3f8c1b4559e19e7..d2f06e4ee72777846eaab2a58ffeeaa53f5c00ad 100644 (file)
@@ -1083,6 +1083,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
     fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
     fps.consumed = mtctx->consumed;
     fps.produced = mtctx->produced;
+    fps.currentJobID = mtctx->nextJobID;
     {   unsigned jobNb;
         unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
         DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
index 0c20bb76841a6b3ff7014fe37e3d0041cc08a73f..edd0079c957a6be9c1d71fe4d34252c3e6fdcdb0 100644 (file)
@@ -735,6 +735,7 @@ typedef struct {
     unsigned long long ingested;
     unsigned long long consumed;
     unsigned long long produced;
+    unsigned currentJobID;
 } ZSTD_frameProgression;
 
 /* ZSTD_getFrameProgression():
index c1587f8f89724f2d923eaa5051c42a4e092f7a32..b62e25703baa2202a5a0b3e220d376bdf1caa486 100644 (file)
@@ -737,6 +737,9 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
     FILE* const dstFile = ress.dstFile;
     U64 compressedfilesize = 0;
     ZSTD_EndDirective directive = ZSTD_e_continue;
+    unsigned inputBlocked = 0;
+    unsigned lastJobID = 0;
+
     DISPLAYLEVEL(6, "compression using zstd format \n");
 
     /* init */
@@ -747,7 +750,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
 
     /* Main compression loop */
     do {
-        size_t result;
+        size_t stillToFlush;
         /* Fill input Buffer */
         size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
         ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
@@ -757,14 +760,18 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
         if ((inSize == 0) || (*readsize == fileSize))
             directive = ZSTD_e_end;
 
-        result = 1;
+        stillToFlush = 1;
         while ((inBuff.pos != inBuff.size)   /* input buffer must be entirely ingested */
-            || (directive == ZSTD_e_end && result != 0) ) {
+            || (directive == ZSTD_e_end && stillToFlush != 0) ) {
+            size_t const oldIPos = inBuff.pos;
             ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
-            CHECK_V(result, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
+            CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
+
+            /* count stats */
+            if (oldIPos == inBuff.pos) inputBlocked++;
 
             /* Write compressed stream */
-            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => intput pos(%u)<=(%u)size ; output generated %u bytes \n",
+            DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
                             (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos);
             if (outBuff.pos) {
                 size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
@@ -775,6 +782,18 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
             if (READY_FOR_UPDATE()) {
                 ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
                 double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
+
+                /* check input speed */
+                if (zfp.currentJobID >= lastJobID+2) {
+                    if (inputBlocked<=1) {   /* small tolerance */
+                        DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
+                        compressionLevel++;
+                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
+                    }
+                    lastJobID = zfp.currentJobID;
+                    inputBlocked = 0;
+                }
+
                 if (g_displayLevel >= 3) {
                     DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%%",
                                 compressionLevel,