From: Yann Collet Date: Sat, 18 Aug 2018 01:11:54 +0000 (-0700) Subject: created ZSTDMT_toFlushNow() X-Git-Tag: v1.3.6^2~19^2~13 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=105677c6dbc5bfeaf8a0e52334104322d4618d5c;p=thirdparty%2Fzstd.git created ZSTDMT_toFlushNow() tells in a non-blocking way if there is something ready to flush right now. only works with multi-threading for the time being. Useful to know if flush speed will be limited by lack of production. --- diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 4bc18d2ec..34d4f5b0f 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -906,6 +906,20 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx) return fp; } } +/*! ZSTD_toFlushNow() + * Only useful for multithreading scenarios currently (nbWorkers >= 1). + */ +size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx) +{ +#ifdef ZSTD_MULTITHREAD + if (cctx->appliedParams.nbWorkers > 0) { + return ZSTDMT_toFlushNow(cctx->mtctx); + } +#endif + return 0; /* over-simplification; could also check if context is currently running in streaming mode, and in which case, report how many bytes are left to be flushed within output buffer */ +} + + static U32 ZSTD_equivalentCParams(ZSTD_compressionParameters cParams1, ZSTD_compressionParameters cParams2) diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index b61cd50ee..cc1af58e6 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1096,20 +1096,45 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); { size_t const cResult = jobPtr->cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; + size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; + assert(flushed <= produced); fps.ingested += jobPtr->src.size; fps.consumed += jobPtr->consumed; fps.produced += produced; - fps.flushed += jobPtr->dstFlushed; + fps.flushed += flushed; fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } } - DEBUGLOG(5, "ZSTDMT_getFrameProgression : completed"); return fps; } +size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx) +{ + size_t toFlush; + unsigned const jobID = mtctx->doneJobID; + assert(jobID <= mtctx->nextJobID); + if (jobID == mtctx->nextJobID) return 0; /* no active job => nothing to flush */ + + { unsigned const wJobID = jobID & mtctx->jobIDMask; + ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID]; + ZSTD_pthread_mutex_lock(&jobPtr->job_mutex); + { size_t const cResult = jobPtr->cSize; + size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; + size_t const flushed = ZSTD_isError(cResult) ? 0 : jobPtr->dstFlushed; + assert(flushed <= produced); + toFlush = produced - flushed; + if (toFlush==0) assert(jobPtr->consumed < jobPtr->src.size); /* if toFlush==0, doneJobID should still be active: if doneJobID is completed and fully flushed, ZSTDMT_flushProduced() should have already moved to next job */ + } + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); + } + + return toFlush; +} + + /* ------------------------------------------ */ /* ===== Multi-threaded compression ===== */ /* ------------------------------------------ */ diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index 34a475a42..12ad9f899 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -119,11 +119,21 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, * === Not exposed in libzstd. Never invoke directly === * ======================================================== */ + /*! ZSTDMT_toFlushNow() + * Tell how many bytes are ready to be flushed immediately. + * Probe the oldest active job (not yet entirely flushed) and check its output buffer. + * If return 0, it means there is no active job, + * or, it means oldest job is still active, but everything produced has been flushed so far, + * therefore flushing is limited by speed of oldest job. */ +size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx* mtctx); + +/*! ZSTDMT_CCtxParam_setMTCtxParameter() + * like ZSTDMT_setMTCtxParameter(), but into a ZSTD_CCtx_Params */ size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value); -/* ZSTDMT_CCtxParam_setNbWorkers() - * Set nbWorkers, and clamp it. - * Also reset jobSize and overlapLog */ +/*! ZSTDMT_CCtxParam_setNbWorkers() + * Set nbWorkers, and clamp it. + * Also reset jobSize and overlapLog */ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers); /*! ZSTDMT_updateCParams_whileCompressing() : @@ -131,9 +141,9 @@ size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorker * New parameters will be applied to next compression job. */ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams); -/* ZSTDMT_getFrameProgression(): - * tells how much data has been consumed (input) and produced (output) for current frame. - * able to count progression inside worker threads. +/*! ZSTDMT_getFrameProgression(): + * tells how much data has been consumed (input) and produced (output) for current frame. + * able to count progression inside worker threads. */ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx); diff --git a/lib/zstd.h b/lib/zstd.h index 02e447b30..edb107c2b 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -746,7 +746,16 @@ typedef struct { * Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed. * Can report progression inside worker threads (multi-threading and non-blocking mode). */ -ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx); +ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx); + +/*! ZSTD_toFlushNow() + * Tell how many bytes are ready to be flushed immediately. + * Useful for multithreading scenarios (nbWorkers >= 1). + * Probe the oldest active job (not yet entirely flushed) and check its output buffer. + * If return 0, it means there is no active job, or + * it means oldest job is still active, but everything produced has been flushed so far, + * therefore flushing is limited by speed of oldest job. */ +ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx); diff --git a/programs/fileio.c b/programs/fileio.c index fcb43030a..aeacd0440 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -747,6 +747,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, /* stats */ typedef enum { noChange, slower, faster } speedChange_e; speedChange_e speedChange = noChange; + unsigned flushWaiting = 0; unsigned inputPresented = 0; unsigned inputBlocked = 0; unsigned lastJobID = 0; @@ -777,11 +778,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, size_t const oldIPos = inBuff.pos; ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; + size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx); CHECK_V(stillToFlush, ZSTD_compress_generic(ress.cctx, &outBuff, &inBuff, directive)); /* count stats */ inputPresented++; if (oldIPos == inBuff.pos) inputBlocked++; + if (!toFlushNow) flushWaiting = 1; /* Write compressed stream */ DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", @@ -817,11 +820,13 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, speedChange = slower; } - if ( (newlyProduced > (newlyFlushed * 9 / 8)) - && (stillToFlush > ZSTD_BLOCKSIZE_MAX) ) { - DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush); + if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */ + && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */ + ) { + DISPLAYLEVEL(6, "compression faster than flush (%llu > %llu), and flushed was never slowed down by lack of production => slow down \n", newlyProduced, newlyFlushed); speedChange = slower; } + flushWaiting = 0; } /* course correct only if there is at least one new job completed */