]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
frameProgression reports nbActiveWorkers and output flushed
authorYann Collet <cyan@fb.com>
Tue, 14 Aug 2018 18:49:25 +0000 (11:49 -0700)
committerYann Collet <cyan@fb.com>
Tue, 14 Aug 2018 18:49:25 +0000 (11:49 -0700)
lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
lib/zstd.h
programs/fileio.c

index 1412c1d6abfe988ff40790a83ff10d5af7ca6e57..4bc18d2ec1c6441fc8e24a207f824eeefbc17720 100644 (file)
@@ -900,7 +900,9 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
         fp.ingested = cctx->consumedSrcSize + buffered;
         fp.consumed = cctx->consumedSrcSize;
         fp.produced = cctx->producedCSize;
+        fp.flushed  = cctx->producedCSize;   /* simplified; some data might still be left within streaming output buffer */
         fp.currentJobID = 0;
+        fp.nbActiveWorkers = 0;
         return fp;
 }   }
 
index 49502bd0d228b851c7c14b164a06e74dc9ec685b..b61cd50ee9499f15f71930129d6e67d327b1876b 100644 (file)
@@ -1058,7 +1058,7 @@ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
 
 
 /*! ZSTDMT_updateCParams_whileCompressing() :
- *  Updates only a selected set of compression parameters, to remain compatible with current frame.
+ *  Updates a selected set of compression parameters, remaining compatible with currently active frame.
  *  New parameters will be applied to next compression job. */
 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
 {
@@ -1076,27 +1076,31 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p
 /* ZSTDMT_getFrameProgression():
  * tells how much data has been consumed (input) and produced (output) for current frame.
  * able to count progression inside worker threads.
- * Note : mutex will be acquired during statistics collection. */
+ * Note : mutex will be acquired during statistics collection inside workers. */
 ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
 {
     ZSTD_frameProgression fps;
     DEBUGLOG(5, "ZSTDMT_getFrameProgression");
     fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
     fps.consumed = mtctx->consumed;
-    fps.produced = mtctx->produced;
+    fps.produced = fps.flushed = mtctx->produced;
     fps.currentJobID = mtctx->nextJobID;
+    fps.nbActiveWorkers = 0;
     {   unsigned jobNb;
         unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
         DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
                     mtctx->doneJobID, lastJobNb, mtctx->jobReady)
         for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
             unsigned const wJobID = jobNb & mtctx->jobIDMask;
-            ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
-            {   size_t const cResult = mtctx->jobs[wJobID].cSize;
+            ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
+            ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
+            {   size_t const cResult = jobPtr->cSize;
                 size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
-                fps.ingested += mtctx->jobs[wJobID].src.size;
-                fps.consumed += mtctx->jobs[wJobID].consumed;
+                fps.ingested += jobPtr->src.size;
+                fps.consumed += jobPtr->consumed;
                 fps.produced += produced;
+                fps.flushed  += jobPtr->dstFlushed;
+                fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);
             }
             ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
         }
index edd0079c957a6be9c1d71fe4d34252c3e6fdcdb0..02e447b30965afea619f912e9df11db152360e23 100644 (file)
@@ -732,10 +732,12 @@ ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledg
 
 
 typedef struct {
-    unsigned long long ingested;
-    unsigned long long consumed;
-    unsigned long long produced;
-    unsigned currentJobID;
+    unsigned long long ingested;   /* nb input bytes read and buffered */
+    unsigned long long consumed;   /* nb input bytes actually compressed */
+    unsigned long long produced;   /* nb of compressed bytes generated and buffered */
+    unsigned long long flushed;    /* nb of compressed bytes flushed : not provided; can be tracked from caller side */
+    unsigned currentJobID;         /* MT only : latest started job nb */
+    unsigned nbActiveWorkers;      /* MT only : nb of workers actively compressing at probe time */
 } ZSTD_frameProgression;
 
 /* ZSTD_getFrameProgression():
index b2993619141b54f8fbfb36517d716a4095da641e..68b2f1593b9b32d10dd16a96b22f693248ca7226 100644 (file)
@@ -800,16 +800,17 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
 
                 /* check output speed */
                 if (zfp.currentJobID > 1) {
-                    static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0 };
-                    static unsigned long long lastFlushedSize = 0;
+                    static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };
 
                     unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
-                    unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
+                    unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
                     assert(zfp.produced >= cpszfp.produced);
 
+                    cpszfp = zfp;
+
                     if ( (zfp.ingested == cpszfp.ingested)
                       && (zfp.consumed == cpszfp.consumed) ) {
-                        DISPLAYLEVEL(6, "no data read nor consumed : buffers are full (?) or compression is slow + input has reached its limit. If buffers full : output is too slow => slow down \n")
+                        DISPLAYLEVEL(2, "no data read nor consumed : buffers are full (?) output is too slow => slow down ; or compression is slow + input has reached its limit => can't tell \n")
                         speedChange = slower;
                     }
 
@@ -818,8 +819,6 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                         DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush);
                         speedChange = slower;
                     }
-                    cpszfp = zfp;
-                    lastFlushedSize = compressedfilesize;
                 }
 
                 /* course correct only if there is at least one new job completed */
@@ -832,14 +831,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
                             DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
                             speedChange = slower;
                         } else if (speedChange == noChange) {
-                            static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0 };
-                            static unsigned long long lastFlushedSize = 0;
+                            static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 };
                             unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
                             unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
                             unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
-                            unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
+                            unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed;
                             csuzfp = zfp;
-                            lastFlushedSize = compressedfilesize;
                             assert(inputPresented > 0);
                             DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
                                             inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,