]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
slow down faster when output speed is limited
authorYann Collet <cyan@fb.com>
Fri, 10 Aug 2018 00:44:30 +0000 (17:44 -0700)
committerYann Collet <cyan@fb.com>
Fri, 10 Aug 2018 00:44:30 +0000 (17:44 -0700)
programs/fileio.c

index fe3c13527431486acba18ed3d9b0c8692389eef1..c12126168f3364108b9f6dbf9e2b3a0771398da4 100644 (file)
@@ -737,8 +737,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
     FILE* const dstFile = ress.dstFile;
     U64 compressedfilesize = 0;
     ZSTD_EndDirective directive = ZSTD_e_continue;
+
+    typedef enum { noChange, slower, faster } speedChange_e;
+    speedChange_e speedChange = noChange;
     unsigned inputBlocked = 0;
     unsigned lastJobID = 0;
+    unsigned long long lastProduced = 0;
+    unsigned long long lastFlushedSize = 0;
 
     DISPLAYLEVEL(6, "compression using zstd format \n");
 
@@ -763,6 +768,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
         stillToFlush = 1;
         while ((inBuff.pos != inBuff.size)   /* input buffer must be entirely ingested */
             || (directive == ZSTD_e_end && stillToFlush != 0) ) {
+
             size_t const oldIPos = inBuff.pos;
             ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
             CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
@@ -779,23 +785,55 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                     EXM_THROW(25, "Write error : cannot write compressed block");
                 compressedfilesize += outBuff.pos;
             }
+
+            /* display notification; and adapt compression level */
             if (READY_FOR_UPDATE()) {
                 ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
                 double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
 
-                /* check input speed */
-                if (zfp.currentJobID >= lastJobID+2) {
-                    if (inputBlocked<=1) {   /* small tolerance */
-                        DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
-                        compressionLevel++;
+                /* check output speed */
+                if (zfp.currentJobID > 0) {
+                    unsigned long long newlyProduced = zfp.produced - lastProduced;
+                    unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
+                    assert(zfp.produced >= lastProduced);
+                    if (newlyProduced == 0) {
+                        DISPLAYLEVEL(6, "no more data compression generation => buffers are full, compression waiting => output (or input) too slow \n")
+                        speedChange = slower;
+                    }
+
+                    if ( (newlyProduced > (newlyFlushed * 9 / 8))
+                      && (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) {
+                        DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) \n", newlyProduced, newlyFlushed);
+                        speedChange = slower;
+                    }
+                    lastProduced = zfp.produced;
+                    lastFlushedSize = compressedfilesize;
+                }
+
+                /* course correct only if there is at least one job completed */
+                if (zfp.currentJobID > lastJobID) {
+                    DISPLAYLEVEL(6, "compression level adaptation check \n")
+
+                    /* check input speed */
+                    if (zfp.currentJobID > g_nbWorkers+1) {   /* warm up period, to fill all workers */
+                        if (inputBlocked <= 1) {   /* small tolerance */
+                            DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
+                            speedChange = slower;
+                        }
+                        inputBlocked = 0;
+                    }
+
+                    if (speedChange == slower) {
+                        DISPLAYLEVEL(6, "slower speed , higher compression \n")
+                        compressionLevel ++;
                         ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
+                        speedChange = noChange;
                     }
                     lastJobID = zfp.currentJobID;
-                    inputBlocked = 0;
-                }
+                }    /* if (zfp.currentJobID > lastJobID) */
 
                 if (g_displayLevel >= 3) {
-                    DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%%",
+                    DISPLAYUPDATE(3, "\r(L%i) Buffered :%4u MB - Consumed :%4u MB - Compressed :%4u MB => %.2f%% ",
                                 compressionLevel,
                                 (U32)((zfp.ingested - zfp.consumed) >> 20),
                                 (U32)(zfp.consumed >> 20),