]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
created ZSTDMT_toFlushNow()
authorYann Collet <cyan@fb.com>
Sat, 18 Aug 2018 01:11:54 +0000 (18:11 -0700)
committerYann Collet <cyan@fb.com>
Sat, 18 Aug 2018 01:11:54 +0000 (18:11 -0700)
tells in a non-blocking way if there is something ready to flush right now.
only works with multi-threading for the time being.

Useful to know if flush speed will be limited by lack of production.

lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
lib/zstd.h
programs/fileio.c

index 4bc18d2ec1c6441fc8e24a207f824eeefbc17720..34d4f5b0f54ffa2702a5434ab7ae1b2249505f4f 100644 (file)
@@ -906,6 +906,20 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
         return fp;
 }   }
 
+/*! ZSTD_toFlushNow()
+ *  Only useful for multithreading scenarios currently (nbWorkers >= 1).
+ */
+size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx)
+{
+#ifdef ZSTD_MULTITHREAD
+    if (cctx->appliedParams.nbWorkers > 0) {
+        return ZSTDMT_toFlushNow(cctx->mtctx);
+    }
+#endif
+    return 0;   /* over-simplification; could also check if context is currently running in streaming mode, and in which case, report how many bytes are left to be flushed within output buffer */
+}
+
+
 
 static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1,
                                   ZSTD_compressionParameters cParams2)
index b61cd50ee9499f15f71930129d6e67d327b1876b..cc1af58e66221030d0449c26afcb16ff5429e95c 100644 (file)
@@ -1096,20 +1096,45 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
             ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
             {   size_t const cResult = jobPtr->cSize;
                 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
+                size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
+                assert(flushed <= produced);
                 fps.ingested += jobPtr->src.size;
                 fps.consumed += jobPtr->consumed;
                 fps.produced += produced;
-                fps.flushed  += jobPtr->dstFlushed;
+                fps.flushed  += flushed;
                 fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);
             }
             ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         }
     }
-    DEBUGLOG(5, "ZSTDMT_getFrameProgression : completed");
     return fps;
 }
 
 
+size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx)
+{
+    size_t toFlush;
+    unsigned const jobID = mtctx->doneJobID;
+    assert(jobID <= mtctx->nextJobID);
+    if (jobID == mtctx->nextJobID) return 0;   /* no active job => nothing to flush */
+
+    {   unsigned const wJobID = jobID & mtctx->jobIDMask;
+        ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
+        ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+        {   size_t const cResult = jobPtr->cSize;
+            size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
+            size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed;
+            assert(flushed <= produced);
+            toFlush = produced - flushed;
+            if (toFlush==0) assert(jobPtr->consumed < jobPtr->src.size);   /* if toFlush==0, doneJobID should still be active: if doneJobID is completed and fully flushed, ZSTDMT_flushProduced() should have already moved to next job */
+        }
+        ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
+    }
+
+    return toFlush;
+}
+
+
 /* ------------------------------------------ */
 /* =====   Multi-threaded compression   ===== */
 /* ------------------------------------------ */
index 34a475a42bffc0ed216b1f33e947270930375916..12ad9f899b57648b1466d17eb2494829463ef8e8 100644 (file)
@@ -119,11 +119,21 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
  * ===  Not exposed in libzstd. Never invoke directly   ===
  * ======================================================== */
 
+ /*! ZSTDMT_toFlushNow()
+  *  Tell how many bytes are ready to be flushed immediately.
+  *  Probe the oldest active job (not yet entirely flushed) and check its output buffer.
+  *  If return 0, it means there is no active job,
+  *  or, it means oldest job is still active, but everything produced has been flushed so far,
+  *  therefore flushing is limited by speed of oldest job. */
+size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx);
+
+/*! ZSTDMT_CCtxParam_setMTCtxParameter()
+ *  like ZSTDMT_setMTCtxParameter(), but into a ZSTD_CCtx_Params */
 size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);
 
-/* ZSTDMT_CCtxParam_setNbWorkers()
- * Set nbWorkers, and clamp it.
- * Also reset jobSize and overlapLog */
+/*! ZSTDMT_CCtxParam_setNbWorkers()
+ *  Set nbWorkers, and clamp it.
+ *  Also reset jobSize and overlapLog */
 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
 
 /*! ZSTDMT_updateCParams_whileCompressing() :
@@ -131,9 +141,9 @@ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorker
  *  New parameters will be applied to next compression job. */
 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams);
 
-/* ZSTDMT_getFrameProgression():
- * tells how much data has been consumed (input) and produced (output) for current frame.
- * able to count progression inside worker threads.
+/*! ZSTDMT_getFrameProgression():
+ *  tells how much data has been consumed (input) and produced (output) for current frame.
+ *  able to count progression inside worker threads.
  */
 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx);
 
index 02e447b30965afea619f912e9df11db152360e23..edb107c2b753bbda0af7a02b419a46890a5ff131 100644 (file)
@@ -746,7 +746,16 @@ typedef struct {
  * Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed.
  * Can report progression inside worker threads (multi-threading and non-blocking mode).
  */
-ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
+ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx);
+
+/*! ZSTD_toFlushNow()
+ *  Tell how many bytes are ready to be flushed immediately.
+ *  Useful for multithreading scenarios (nbWorkers >= 1).
+ *  Probe the oldest active job (not yet entirely flushed) and check its output buffer.
+ *  If return 0, it means there is no active job, or
+ *  it means oldest job is still active, but everything produced has been flushed so far,
+ *  therefore flushing is limited by speed of oldest job. */
+ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx);
 
 
 
index fcb43030ab4e88134828b770880e7ceb208b4da1..aeacd0440be9aca90e8f867387d34714a26fbe0b 100644 (file)
@@ -747,6 +747,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
     /* stats */
     typedef enum { noChange, slower, faster } speedChange_e;
     speedChange_e speedChange = noChange;
+    unsigned flushWaiting = 0;
     unsigned inputPresented = 0;
     unsigned inputBlocked = 0;
     unsigned lastJobID = 0;
@@ -777,11 +778,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
 
             size_t const oldIPos = inBuff.pos;
             ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 };
+            size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx);
             CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive));
 
             /* count stats */
             inputPresented++;
             if (oldIPos == inBuff.pos) inputBlocked++;
+            if (!toFlushNow) flushWaiting = 1;
 
             /* Write compressed stream */
             DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n",
@@ -817,11 +820,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                         speedChange = slower;
                     }
 
-                    if ( (newlyProduced > (newlyFlushed * 9 / 8))
-                      && (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) {
-                        DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush);
+                    if ( (newlyProduced > (newlyFlushed * 9 / 8))   /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */
+                      && (flushWaiting == 0)                        /* flush speed was never slowed by lack of production, so it's operating at max capacity */
+                      ) {
+                        DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed);
                         speedChange = slower;
                     }
+                    flushWaiting = 0;
                 }
 
                 /* course correct only if there is at least one new job completed */