]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[fileio] Separate parameter adaption from display update rate
authorNick Terrell <terrelln@fb.com>
Wed, 14 Dec 2022 23:17:05 +0000 (15:17 -0800)
committerNick Terrell <nickrterrell@gmail.com>
Thu, 15 Dec 2022 01:08:21 +0000 (17:08 -0800)
Split the logic for parameter adaption from the logic to update the display rate.
This decouples the two updates, so changes to display updates don't affect
parameter adaption.

Also add a test case that checks that parameter adaption actually happens.

This fixes Issue #3353, where --adapt is broken when --no-progress is passed.

programs/fileio.c
programs/fileio_common.h
tests/cli-tests/compression/adapt.sh

index 4c1331432e2545e51ac7bd411186acba095b5bbb..15a0d5387cbe0a6d88d5ddbbf7badae64f883b8b 100644 (file)
@@ -1301,6 +1301,9 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
     unsigned inputPresented = 0;
     unsigned inputBlocked = 0;
     unsigned lastJobID = 0;
+    UTIL_time_t lastAdaptTime = UTIL_getTime();
+    U64 const adaptEveryMicro = REFRESH_RATE;
+
     UTIL_HumanReadableSize_t const file_hrs = UTIL_makeHumanReadableSize(fileSize);
 
     DISPLAYLEVEL(6, "compression using zstd format \n");
@@ -1369,14 +1372,106 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
                 compressedfilesize += outBuff.pos;
             }
 
-            /* display notification; and adapt compression level */
-            if (READY_FOR_UPDATE()) {
+            /* adaptive mode : statistics measurement and speed correction */
+            if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) {
+                ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
+
+                lastAdaptTime = UTIL_getTime();
+
+                /* check output speed */
+                if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */
+
+                    unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
+                    unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
+                    assert(zfp.produced >= previous_zfp_update.produced);
+                    assert(prefs->nbWorkers >= 1);
+
+                    /* test if compression is blocked
+                        * either because output is slow and all buffers are full
+                        * or because input is slow and no job can start while waiting for at least one buffer to be filled.
+                        * note : exclude starting part, since currentJobID > 1 */
+                    if ( (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
+                        && (zfp.nbActiveWorkers == 0)                       /* confirmed : no compression ongoing */
+                        ) {
+                        DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
+                        speedChange = slower;
+                    }
+
+                    previous_zfp_update = zfp;
+
+                    if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
+                        && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
+                        ) {
+                        DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
+                        speedChange = slower;
+                    }
+                    flushWaiting = 0;
+                }
+
+                /* course correct only if there is at least one new job completed */
+                if (zfp.currentJobID > lastJobID) {
+                    DISPLAYLEVEL(6, "compression level adaptation check \n")
+
+                    /* check input speed */
+                    if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) {   /* warm up period, to fill all workers */
+                        if (inputBlocked <= 0) {
+                            DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
+                            speedChange = slower;
+                        } else if (speedChange == noChange) {
+                            unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
+                            unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
+                            unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
+                            unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
+                            previous_zfp_correction = zfp;
+                            assert(inputPresented > 0);
+                            DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
+                                            inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
+                                            (unsigned)newlyIngested, (unsigned)newlyConsumed,
+                                            (unsigned)newlyFlushed, (unsigned)newlyProduced);
+                            if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
+                                && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
+                                && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
+                            ) {
+                                DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
+                                                newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
+                                speedChange = faster;
+                            }
+                        }
+                        inputBlocked = 0;
+                        inputPresented = 0;
+                    }
+
+                    if (speedChange == slower) {
+                        DISPLAYLEVEL(6, "slower speed , higher compression \n")
+                        compressionLevel ++;
+                        if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
+                        if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
+                        compressionLevel += (compressionLevel == 0);   /* skip 0 */
+                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
+                    }
+                    if (speedChange == faster) {
+                        DISPLAYLEVEL(6, "faster speed , lighter compression \n")
+                        compressionLevel --;
+                        if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
+                        compressionLevel -= (compressionLevel == 0);   /* skip 0 */
+                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
+                    }
+                    speedChange = noChange;
+
+                    lastJobID = zfp.currentJobID;
+                }  /* if (zfp.currentJobID > lastJobID) */
+            } /* if (prefs->adaptiveMode && UTIL_clockSpanMicro(lastAdaptTime) > adaptEveryMicro) */
+
+            /* display notification */
+            if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) {
                 ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
                 double const cShare = (double)zfp.produced / (double)(zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
                 UTIL_HumanReadableSize_t const buffered_hrs = UTIL_makeHumanReadableSize(zfp.ingested - zfp.consumed);
                 UTIL_HumanReadableSize_t const consumed_hrs = UTIL_makeHumanReadableSize(zfp.consumed);
                 UTIL_HumanReadableSize_t const produced_hrs = UTIL_makeHumanReadableSize(zfp.produced);
 
+                DELAY_NEXT_UPDATE();
+
                 /* display progress notifications */
                 DISPLAY_PROGRESS("\r%79s\r", "");    /* Clear out the current displayed line */
                 if (g_display_prefs.displayLevel >= 3) {
@@ -1406,96 +1501,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx,
                     if (fileSize != UTIL_FILESIZE_UNKNOWN)
                         DISPLAY_PROGRESS("/%6.*f%4s", file_hrs.precision, file_hrs.value, file_hrs.suffix);
                     DISPLAY_PROGRESS(" ==> %2.f%%", cShare);
-                    DELAY_NEXT_UPDATE();
                 }
-
-                /* adaptive mode : statistics measurement and speed correction */
-                if (prefs->adaptiveMode) {
-
-                    /* check output speed */
-                    if (zfp.currentJobID > 1) {  /* only possible if nbWorkers >= 1 */
-
-                        unsigned long long newlyProduced = zfp.produced - previous_zfp_update.produced;
-                        unsigned long long newlyFlushed = zfp.flushed - previous_zfp_update.flushed;
-                        assert(zfp.produced >= previous_zfp_update.produced);
-                        assert(prefs->nbWorkers >= 1);
-
-                        /* test if compression is blocked
-                         * either because output is slow and all buffers are full
-                         * or because input is slow and no job can start while waiting for at least one buffer to be filled.
-                         * note : exclude starting part, since currentJobID > 1 */
-                        if ( (zfp.consumed == previous_zfp_update.consumed)   /* no data compressed : no data available, or no more buffer to compress to, OR compression is really slow (compression of a single block is slower than update rate)*/
-                          && (zfp.nbActiveWorkers == 0)                       /* confirmed : no compression ongoing */
-                          ) {
-                            DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n")
-                            speedChange = slower;
-                        }
-
-                        previous_zfp_update = zfp;
-
-                        if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
-                          && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
-                          ) {
-                            DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
-                            speedChange = slower;
-                        }
-                        flushWaiting = 0;
-                    }
-
-                    /* course correct only if there is at least one new job completed */
-                    if (zfp.currentJobID > lastJobID) {
-                        DISPLAYLEVEL(6, "compression level adaptation check \n")
-
-                        /* check input speed */
-                        if (zfp.currentJobID > (unsigned)(prefs->nbWorkers+1)) {   /* warm up period, to fill all workers */
-                            if (inputBlocked <= 0) {
-                                DISPLAYLEVEL(6, "input is never blocked => input is slower than ingestion \n");
-                                speedChange = slower;
-                            } else if (speedChange == noChange) {
-                                unsigned long long newlyIngested = zfp.ingested - previous_zfp_correction.ingested;
-                                unsigned long long newlyConsumed = zfp.consumed - previous_zfp_correction.consumed;
-                                unsigned long long newlyProduced = zfp.produced - previous_zfp_correction.produced;
-                                unsigned long long newlyFlushed  = zfp.flushed  - previous_zfp_correction.flushed;
-                                previous_zfp_correction = zfp;
-                                assert(inputPresented > 0);
-                                DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
-                                                inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,
-                                                (unsigned)newlyIngested, (unsigned)newlyConsumed,
-                                                (unsigned)newlyFlushed, (unsigned)newlyProduced);
-                                if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
-                                  && (newlyFlushed * 33 / 32 > newlyProduced)  /* flush everything that is produced */
-                                  && (newlyIngested * 33 / 32 > newlyConsumed) /* input speed as fast or faster than compression speed */
-                                ) {
-                                    DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
-                                                    newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
-                                    speedChange = faster;
-                                }
-                            }
-                            inputBlocked = 0;
-                            inputPresented = 0;
-                        }
-
-                        if (speedChange == slower) {
-                            DISPLAYLEVEL(6, "slower speed , higher compression \n")
-                            compressionLevel ++;
-                            if (compressionLevel > ZSTD_maxCLevel()) compressionLevel = ZSTD_maxCLevel();
-                            if (compressionLevel > prefs->maxAdaptLevel) compressionLevel = prefs->maxAdaptLevel;
-                            compressionLevel += (compressionLevel == 0);   /* skip 0 */
-                            ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
-                        }
-                        if (speedChange == faster) {
-                            DISPLAYLEVEL(6, "faster speed , lighter compression \n")
-                            compressionLevel --;
-                            if (compressionLevel < prefs->minAdaptLevel) compressionLevel = prefs->minAdaptLevel;
-                            compressionLevel -= (compressionLevel == 0);   /* skip 0 */
-                            ZSTD_CCtx_setParameter(ress.cctx, ZSTD_c_compressionLevel, compressionLevel);
-                        }
-                        speedChange = noChange;
-
-                        lastJobID = zfp.currentJobID;
-                    }  /* if (zfp.currentJobID > lastJobID) */
-                }  /* if (g_adaptiveMode) */
-            }  /* if (READY_FOR_UPDATE()) */
+            }  /* if (SHOULD_DISPLAY_PROGRESS() && READY_FOR_UPDATE()) */
         }  /* while ((inBuff.pos != inBuff.size) */
     } while (directive != ZSTD_e_end);
 
index b82d5b6b16b9b58c4c2c19a64a08d8d8d5986140..aec2e8d56faebc79e2cd1ead7ffe9d3c290885bc 100644 (file)
@@ -38,11 +38,11 @@ extern FIO_display_prefs_t g_display_prefs;
 extern UTIL_time_t g_displayClock;
 
 #define REFRESH_RATE  ((U64)(SEC_TO_MICRO / 6))
-#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE)
+#define READY_FOR_UPDATE() (UTIL_clockSpanMicro(g_displayClock) > REFRESH_RATE || g_display_prefs.displayLevel >= 4)
 #define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); }
 #define DISPLAYUPDATE(l, ...) {                              \
         if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \
-            if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \
+            if (READY_FOR_UPDATE()) { \
                 DELAY_NEXT_UPDATE();                         \
                 DISPLAY(__VA_ARGS__);                        \
                 if (g_display_prefs.displayLevel>=4) fflush(stderr);       \
index 564e955b5ea0a68a2a36f97a593b2983a256208c..30b9afaa03bd225194e115898a677cfec7971df1 100755 (executable)
@@ -4,3 +4,11 @@ set -e
 
 # Test --adapt
 zstd -f file --adapt -c | zstd -t
+
+datagen -g100M > file100M
+
+# Pick parameters to force fast adaptation, even on slow systems
+zstd --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"
+
+# Adaption still happens with --no-progress
+zstd --no-progress --adapt -vvvv -19 --zstd=wlog=10 file100M -o /dev/null 2>&1 | grep -q "faster speed , lighter compression"