From: Yann Collet Date: Tue, 14 Aug 2018 18:49:25 +0000 (-0700) Subject: frameProgression reports nbActiveWorkers and output flushed X-Git-Tag: v1.3.6^2~19^2~17 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=3e4617ef54a423d9c839462b79595162466f05e5;p=thirdparty%2Fzstd.git frameProgression reports nbActiveWorkers and output flushed --- diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 1412c1d6a..4bc18d2ec 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -900,7 +900,9 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx) fp.ingested = cctx->consumedSrcSize + buffered; fp.consumed = cctx->consumedSrcSize; fp.produced = cctx->producedCSize; + fp.flushed = cctx->producedCSize; /* simplified; some data might still be left within streaming output buffer */ fp.currentJobID = 0; + fp.nbActiveWorkers = 0; return fp; } } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 49502bd0d..b61cd50ee 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1058,7 +1058,7 @@ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers) /*! ZSTDMT_updateCParams_whileCompressing() : - * Updates only a selected set of compression parameters, to remain compatible with current frame. + * Updates a selected set of compression parameters, remaining compatible with currently active frame. * New parameters will be applied to next compression job. */ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams) { @@ -1076,27 +1076,31 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p /* ZSTDMT_getFrameProgression(): * tells how much data has been consumed (input) and produced (output) for current frame. * able to count progression inside worker threads. - * Note : mutex will be acquired during statistics collection. */ + * Note : mutex will be acquired during statistics collection inside workers. */ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fps; DEBUGLOG(5, "ZSTDMT_getFrameProgression"); fps.ingested = mtctx->consumed + mtctx->inBuff.filled; fps.consumed = mtctx->consumed; - fps.produced = mtctx->produced; + fps.produced = fps.flushed = mtctx->produced; fps.currentJobID = mtctx->nextJobID; + fps.nbActiveWorkers = 0; { unsigned jobNb; unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", mtctx->doneJobID, lastJobNb, mtctx->jobReady) for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; - ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); - { size_t const cResult = mtctx->jobs[wJobID].cSize; + ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; + ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); + { size_t const cResult = jobPtr->cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; - fps.ingested += mtctx->jobs[wJobID].src.size; - fps.consumed += mtctx->jobs[wJobID].consumed; + fps.ingested += jobPtr->src.size; + fps.consumed += jobPtr->consumed; fps.produced += produced; + fps.flushed += jobPtr->dstFlushed; + fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } diff --git a/lib/zstd.h b/lib/zstd.h index edd0079c9..02e447b30 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -732,10 +732,12 @@ ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledg typedef struct { - unsigned long long ingested; - unsigned long long consumed; - unsigned long long produced; - unsigned currentJobID; + unsigned long long ingested; /* nb input bytes read and buffered */ + unsigned long long consumed; /* nb input bytes actually compressed */ + unsigned long long produced; /* nb of compressed bytes generated and buffered */ + unsigned long long flushed; /* nb of compressed bytes flushed : not provided; can be tracked from caller side */ + unsigned currentJobID; /* MT only : latest started job nb */ + unsigned nbActiveWorkers; /* MT only : nb of workers actively compressing at probe time */ } ZSTD_frameProgression; /* ZSTD_getFrameProgression(): diff --git a/programs/fileio.c b/programs/fileio.c index b29936191..68b2f1593 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -800,16 +800,17 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, /* check output speed */ if (zfp.currentJobID > 1) { - static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0 }; - static unsigned long long lastFlushedSize = 0; + static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; unsigned long long newlyProduced = zfp.produced - cpszfp.produced; - unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize; + unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed; assert(zfp.produced >= cpszfp.produced); + cpszfp = zfp; + if ( (zfp.ingested == cpszfp.ingested) && (zfp.consumed == cpszfp.consumed) ) { - DISPLAYLEVEL(6, "no data read nor consumed : buffers are full (?) or compression is slow + input has reached its limit. If buffers full : output is too slow => slow down \n") + DISPLAYLEVEL(2, "no data read nor consumed : buffers are full (?) output is too slow => slow down ; or compression is slow + input has reached its limit => can't tell \n") speedChange = slower; } @@ -818,8 +819,6 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush); speedChange = slower; } - cpszfp = zfp; - lastFlushedSize = compressedfilesize; } /* course correct only if there is at least one new job completed */ @@ -832,14 +831,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, DISPLAYLEVEL(6, "input is never blocked => input is too slow \n"); speedChange = slower; } else if (speedChange == noChange) { - static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0 }; - static unsigned long long lastFlushedSize = 0; + static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 }; unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested; unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed; unsigned long long newlyProduced = zfp.produced - csuzfp.produced; - unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize; + unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed; csuzfp = zfp; - lastFlushedSize = compressedfilesize; assert(inputPresented > 0); DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n", inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,