]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
introduced command --adapt
authorYann Collet <cyan@fb.com>
Sun, 12 Aug 2018 03:48:06 +0000 (20:48 -0700)
committerYann Collet <cyan@fb.com>
Sun, 12 Aug 2018 03:48:06 +0000 (20:48 -0700)
lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
programs/fileio.c
programs/fileio.h
programs/zstd.1.md
programs/zstdcli.c

index 2121fe74989ed33e1088708515c4ec9a4c2855f5..1412c1d6abfe988ff40790a83ff10d5af7ca6e57 100644 (file)
@@ -3704,6 +3704,7 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
               || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
                 ZSTD_CCtx_reset(cctx);
             }
+            DEBUGLOG(5, "completed ZSTD_compress_generic delegating to ZSTDMT_compressStream_generic");
             return flushMin;
     }   }
 #endif
index d2f06e4ee72777846eaab2a58ffeeaa53f5c00ad..49502bd0d228b851c7c14b164a06e74dc9ec685b 100644 (file)
@@ -249,8 +249,8 @@ static buffer_t ZSTDMT_resizeBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buffer)
 /* store buffer for later re-use, up to pool capacity */
 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
 {
-    if (buf.start == NULL) return;   /* compatible with release on NULL */
     DEBUGLOG(5, "ZSTDMT_releaseBuffer");
+    if (buf.start == NULL) return;   /* compatible with release on NULL */
     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
     if (bufPool->nbBuffers < bufPool->totalBuffers) {
         bufPool->bTable[bufPool->nbBuffers++] = buf;  /* stored for later use */
@@ -541,6 +541,7 @@ static void ZSTDMT_serialState_update(serialState_t* serialState,
     /* Wait for our turn */
     ZSTD_PTHREAD_MUTEX_LOCK(&serialState->mutex);
     while (serialState->nextJobID < jobID) {
+        DEBUGLOG(5, "wait for serialState->cond");
         ZSTD_pthread_cond_wait(&serialState->cond, &serialState->mutex);
     }
     /* A future job may error and skip our job */
@@ -932,7 +933,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx)
         unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask;
         ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex);
         while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) {
-            DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
+            DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx->doneJobID);   /* we want to block when waiting for data to flush */
             ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex);
         }
         ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex);
@@ -1079,7 +1080,7 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p
 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 {
     ZSTD_frameProgression fps;
-    DEBUGLOG(6, "ZSTDMT_getFrameProgression");
+    DEBUGLOG(5, "ZSTDMT_getFrameProgression");
     fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
     fps.consumed = mtctx->consumed;
     fps.produced = mtctx->produced;
@@ -1100,6 +1101,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
             ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         }
     }
+    DEBUGLOG(5, "ZSTDMT_getFrameProgression : completed");
     return fps;
 }
 
@@ -1576,7 +1578,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
     /* try to flush something */
     {   size_t cSize = mtctx->jobs[wJobID].cSize;                  /* shared */
         size_t const srcConsumed = mtctx->jobs[wJobID].consumed;   /* shared */
-        size_t const srcSize = mtctx->jobs[wJobID].src.size;        /* read-only, could be done after mutex lock, but no-declaration-after-statement */
+        size_t const srcSize = mtctx->jobs[wJobID].src.size;       /* read-only, could be done after mutex lock, but no-declaration-after-statement */
         ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         if (ZSTD_isError(cSize)) {
             DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
@@ -1615,6 +1617,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u
                 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
                         mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed);
                 ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff);
+                DEBUGLOG(5, "dstBuffer released")
                 mtctx->jobs[wJobID].dstBuff = g_nullBuffer;
                 mtctx->jobs[wJobID].cSize = 0;   /* ensure this job slot is considered "not started" in future check */
                 mtctx->consumed += srcSize;
@@ -1691,6 +1694,7 @@ static int ZSTDMT_doesOverlapWindow(buffer_t buffer, ZSTD_window_t window)
     range_t extDict;
     range_t prefix;
 
+    DEBUGLOG(5, "ZSTDMT_doesOverlapWindow");
     extDict.start = window.dictBase + window.lowLimit;
     extDict.size = window.dictLimit - window.lowLimit;
 
@@ -1711,12 +1715,13 @@ static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx* mtctx, buffer_t buffer)
 {
     if (mtctx->params.ldmParams.enableLdm) {
         ZSTD_pthread_mutex_t* mutex = &mtctx->serial.ldmWindowMutex;
+        DEBUGLOG(5, "ZSTDMT_waitForLdmComplete");
         DEBUGLOG(5, "source  [0x%zx, 0x%zx)",
                     (size_t)buffer.start,
                     (size_t)buffer.start + buffer.capacity);
         ZSTD_PTHREAD_MUTEX_LOCK(mutex);
         while (ZSTDMT_doesOverlapWindow(buffer, mtctx->serial.ldmWindow)) {
-            DEBUGLOG(6, "Waiting for LDM to finish...");
+            DEBUGLOG(5, "Waiting for LDM to finish...");
             ZSTD_pthread_cond_wait(&mtctx->serial.ldmWindowCond, mutex);
         }
         DEBUGLOG(6, "Done waiting for LDM to finish");
@@ -1736,6 +1741,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
     size_t const target = mtctx->targetSectionSize;
     buffer_t buffer;
 
+    DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
     assert(mtctx->inBuff.buffer.start == NULL);
     assert(mtctx->roundBuff.capacity >= target);
 
@@ -1749,7 +1755,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
         buffer.start = start;
         buffer.capacity = prefixSize;
         if (ZSTDMT_isOverlapped(buffer, inUse)) {
-            DEBUGLOG(6, "Waiting for buffer...");
+            DEBUGLOG(5, "Waiting for buffer...");
             return 0;
         }
         ZSTDMT_waitForLdmComplete(mtctx, buffer);
@@ -1761,7 +1767,7 @@ static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx)
     buffer.capacity = target;
 
     if (ZSTDMT_isOverlapped(buffer, inUse)) {
-        DEBUGLOG(6, "Waiting for buffer...");
+        DEBUGLOG(5, "Waiting for buffer...");
         return 0;
     }
     assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix));
@@ -1834,8 +1840,10 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
                 /* It is only possible for this operation to fail if there are
                  * still compression jobs ongoing.
                  */
+                DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed")
                 assert(mtctx->doneJobID != mtctx->nextJobID);
-            }
+            } else
+                DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx->inBuff.buffer.start);
         }
         if (mtctx->inBuff.buffer.start != NULL) {
             size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled);
@@ -1863,6 +1871,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
     /* check for potential compressed data ready to be flushed */
     {   size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */
         if (input->pos < input->size) return MAX(remainingToFlush, 1);  /* input not consumed : do not end flush yet */
+        DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32)remainingToFlush);
         return remainingToFlush;
     }
 }
index c12126168f3364108b9f6dbf9e2b3a0771398da4..89ee524b3de7ade7ad8be091ce618a6152718c2f 100644 (file)
@@ -226,6 +226,8 @@ void FIO_setOverlapLog(unsigned overlapLog){
         DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n");
     g_overlapLog = overlapLog;
 }
+static U32 g_adaptiveMode = 0;
+void FIO_setAdaptiveMode(unsigned adapt) { g_adaptiveMode = adapt; }
 static U32 g_ldmFlag = 0;
 void FIO_setLdmFlag(unsigned ldmFlag) {
     g_ldmFlag = (ldmFlag>0);
@@ -738,12 +740,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
     U64 compressedfilesize = 0;
     ZSTD_EndDirective directive = ZSTD_e_continue;
 
+    /* stats */
     typedef enum { noChange, slower, faster } speedChange_e;
     speedChange_e speedChange = noChange;
+    unsigned inputPresented = 0;
     unsigned inputBlocked = 0;
     unsigned lastJobID = 0;
-    unsigned long long lastProduced = 0;
-    unsigned long long lastFlushedSize = 0;
 
     DISPLAYLEVEL(6, "compression using zstd format \n");
 
@@ -774,6 +776,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
             CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
 
             /* count stats */
+            inputPresented++;
             if (oldIPos == inBuff.pos) inputBlocked++;
 
             /* Write compressed stream */
@@ -792,41 +795,74 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                 double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100;
 
                 /* check output speed */
-                if (zfp.currentJobID > 0) {
-                    unsigned long long newlyProduced = zfp.produced - lastProduced;
+                if (zfp.currentJobID > 1) {
+                    static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0 };
+                    static unsigned long long lastFlushedSize = 0;
+
+                    unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
                     unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
-                    assert(zfp.produced >= lastProduced);
-                    if (newlyProduced == 0) {
-                        DISPLAYLEVEL(6, "no more data compression generation => buffers are full, compression waiting => output (or input) too slow \n")
+                    assert(zfp.produced >= cpszfp.produced);
+
+                    if ( (zfp.ingested == cpszfp.ingested)
+                      && (zfp.consumed == cpszfp.consumed) ) {
+                        DISPLAYLEVEL(6, "no data read nor consumed : buffers are full (?) or compression is slow + input has reached its limit. If buffers full : output is too slow => slow down \n")
                         speedChange = slower;
                     }
 
                     if ( (newlyProduced > (newlyFlushed * 9 / 8))
                       && (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) {
-                        DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) \n", newlyProduced, newlyFlushed);
+                        DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush);
                         speedChange = slower;
                     }
-                    lastProduced = zfp.produced;
+                    cpszfp = zfp;
                     lastFlushedSize = compressedfilesize;
                 }
 
-                /* course correct only if there is at least one job completed */
+                /* course correct only if there is at least one new job completed */
                 if (zfp.currentJobID > lastJobID) {
                     DISPLAYLEVEL(6, "compression level adaptation check \n")
 
                     /* check input speed */
                     if (zfp.currentJobID > g_nbWorkers+1) {   /* warm up period, to fill all workers */
-                        if (inputBlocked <= 1) {   /* small tolerance */
+                        if (inputBlocked <= 0) {
                             DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
                             speedChange = slower;
+                        } else if (speedChange == noChange) {
+                            static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0 };
+                            static unsigned long long lastFlushedSize = 0;
+                            unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
+                            unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
+                            unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
+                            unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
+                            csuzfp = zfp;
+                            lastFlushedSize = compressedfilesize;
+                            assert(inputPresented > 0);
+                            if ( (inputBlocked > inputPresented / 8)     /* input is waiting often, because input buffers is full : compression or output too slow */
+                              && (newlyFlushed * 17 / 16 > newlyProduced)  /* flush everything that is produced */
+                              && (newlyIngested * 17 / 16 > newlyConsumed) /* can't keep up with input speed */
+                            ) {
+                                DISPLAYLEVEL(6, "recommend faster as in(%llu) >= (%llu)comp(%llu) <= out(%llu) \n",
+                                                newlyIngested, newlyConsumed, newlyProduced, newlyFlushed);
+                                speedChange = faster;
+                            }
                         }
                         inputBlocked = 0;
+                        inputPresented = 0;
                     }
 
-                    if (speedChange == slower) {
-                        DISPLAYLEVEL(6, "slower speed , higher compression \n")
-                        compressionLevel ++;
-                        ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
+                    if (g_adaptiveMode) {
+                        if (speedChange == slower) {
+                            DISPLAYLEVEL(6, "slower speed , higher compression \n")
+                            compressionLevel ++;
+                            compressionLevel += (compressionLevel == 0);   /* skip 0 */
+                            ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
+                        }
+                        if (speedChange == faster) {
+                            DISPLAYLEVEL(6, "slower speed , higher compression \n")
+                            compressionLevel --;
+                            compressionLevel -= (compressionLevel == 0);   /* skip 0 */
+                            ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, (unsigned)compressionLevel);
+                        }
                         speedChange = noChange;
                     }
                     lastJobID = zfp.currentJobID;
index 69c83f71dce3d375d6fc77bc2d01eef4c0203b53..f4946c78a7419b98ee1834a8086051ef4dff3865 100644 (file)
@@ -57,6 +57,7 @@ void FIO_setMemLimit(unsigned memLimit);
 void FIO_setNbWorkers(unsigned nbWorkers);
 void FIO_setBlockSize(unsigned blockSize);
 void FIO_setOverlapLog(unsigned overlapLog);
+void FIO_setAdaptiveMode(unsigned adapt);
 void FIO_setLdmFlag(unsigned ldmFlag);
 void FIO_setLdmHashLog(unsigned ldmHashLog);
 void FIO_setLdmMinMatch(unsigned ldmMinMatch);
index 055c5c2444f57c6d462d00ad0a353894a3fe68a9..b71d5d5bfd219716bb846244125984ab24439f5e 100644 (file)
@@ -102,6 +102,13 @@ the last one takes effect.
 
 * `-#`:
     `#` compression level \[1-19] (default: 3)
+* `--fast[=#]`:
+    switch to ultra-fast compression levels.
+    If `=#` is not present, it defaults to `1`.
+    The higher the value, the faster the compression speed,
+    at the cost of some compression ratio.
+    This setting overwrites compression level if one was set previously.
+    Similarly, if a compression level is set after `--fast`, it overrides it.
 * `--ultra`:
     unlocks high compression levels 20+ (maximum 22), using a lot more memory.
     Note that decompression will also require more memory when using these levels.
@@ -115,25 +122,23 @@ the last one takes effect.
 
     Note: If `windowLog` is set to larger than 27, `--long=windowLog` or
     `--memory=windowSize` needs to be passed to the decompressor.
-* `--fast[=#]`:
-    switch to ultra-fast compression levels.
-    If `=#` is not present, it defaults to `1`.
-    The higher the value, the faster the compression speed,
-    at the cost of some compression ratio.
-    This setting overwrites compression level if one was set previously.
-    Similarly, if a compression level is set after `--fast`, it overrides it.
-
 * `-T#`, `--threads=#`:
     Compress using `#` working threads (default: 1).
     If `#` is 0, attempt to detect and use the number of physical CPU cores.
     In all cases, the nb of threads is capped to ZSTDMT_NBTHREADS_MAX==200.
     This modifier does nothing if `zstd` is compiled without multithread support.
 * `--single-thread`:
-    Does not spawn a thread for compression, use caller thread instead.
-    This is the only available mode when multithread support is disabled.
-    In this mode, compression is serialized with I/O.
+    Does not spawn a thread for compression, use a single thread for both I/O and compression.
+    In this mode, compression is serialized with I/O, which is slightly slower.
     (This is different from `-T1`, which spawns 1 compression thread in parallel of I/O).
-    Single-thread mode also features lower memory usage.
+    This mode is the only one available when multithread support is disabled.
+    Single-thread mode features lower memory usage.
+    Final compressed result is slightly different from `-T1`.
+* `--adapt` :
+    `zstd` will dynamically adapt compression level to perceived I/O conditions.
+    The current compression level can be observed live by using command `-v`.
+    Works with multi-threading and `--long` mode.
+    Does not work with `--single-thread`.
 * `-D file`:
     use `file` as Dictionary to compress or decompress FILE(s)
 * `--no-dictID`:
index d5a2216d612694cb95fd9639d7cc6a5d207d4ec9..2e54b3b0f54167b9c008946ec8ef35b56763ebef 100644 (file)
@@ -135,6 +135,7 @@ static int usage_advanced(const char* programName)
 #ifndef ZSTD_NOCOMPRESS
     DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel());
     DISPLAY( "--long[=#]: enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog);
+    DISPLAY( "--adapt : automatically adapt compression level to I/O conditions \n");
     DISPLAY( "--fast[=#]: switch to ultra fast compression level (default: %u)\n", 1);
 #ifdef ZSTD_MULTITHREAD
     DISPLAY( " -T#    : spawns # compression threads (default: 1, 0==# cores) \n");
@@ -395,6 +396,7 @@ int main(int argCount, const char* argv[])
         ldmFlag = 0,
         main_pause = 0,
         nbWorkers = 0,
+        adapt = 0,
         nextArgumentIsOutFileName = 0,
         nextArgumentIsMaxDict = 0,
         nextArgumentIsDictID = 0,
@@ -511,6 +513,7 @@ int main(int argCount, const char* argv[])
                     if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(0); continue; }
                     if (!strcmp(argument, "--rm")) { FIO_setRemoveSrcFile(1); continue; }
                     if (!strcmp(argument, "--priority=rt")) { setRealTimePrio = 1; continue; }
+                    if (!strcmp(argument, "--adapt")) { adapt = 1; continue; }
                     if (!strcmp(argument, "--single-thread")) { nbWorkers = 0; singleThread = 1; continue; }
                     if (!strcmp(argument, "--format=zstd")) { suffix = ZSTD_EXTENSION; FIO_setCompressionType(FIO_zstdCompression); continue; }
 #ifdef ZSTD_GZCOMPRESS
@@ -935,17 +938,14 @@ int main(int argCount, const char* argv[])
 #ifndef ZSTD_NOCOMPRESS
         FIO_setNbWorkers(nbWorkers);
         FIO_setBlockSize((U32)blockSize);
+        if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog);
         FIO_setLdmFlag(ldmFlag);
         FIO_setLdmHashLog(g_ldmHashLog);
         FIO_setLdmMinMatch(g_ldmMinMatch);
-        if (g_ldmBucketSizeLog != LDM_PARAM_DEFAULT) {
-            FIO_setLdmBucketSizeLog(g_ldmBucketSizeLog);
-        }
-        if (g_ldmHashEveryLog != LDM_PARAM_DEFAULT) {
-            FIO_setLdmHashEveryLog(g_ldmHashEveryLog);
-        }
+        if (g_ldmBucketSizeLog != LDM_PARAM_DEFAULT) FIO_setLdmBucketSizeLog(g_ldmBucketSizeLog);
+        if (g_ldmHashEveryLog != LDM_PARAM_DEFAULT) FIO_setLdmHashEveryLog(g_ldmHashEveryLog);
+        FIO_setAdaptiveMode(adapt);
 
-        if (g_overlapLog!=OVERLAP_LOG_DEFAULT) FIO_setOverlapLog(g_overlapLog);
         if ((filenameIdx==1) && outFileName)
           operationResult = FIO_compressFilename(outFileName, filenameTable[0], dictFileName, cLevel, &compressionParams);
         else