From: Yann Collet Date: Wed, 19 Sep 2018 21:49:13 +0000 (-0700) Subject: error out when --adapt is associated with --single-thread X-Git-Tag: v1.3.6^2~19^2~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=89bc309d904d1472288b9458687994987a8ce68d;p=thirdparty%2Fzstd.git error out when --adapt is associated with --single-thread since they are not compatible --- diff --git a/lib/zstd.h b/lib/zstd.h index 25441e68c..669161b41 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -736,13 +736,14 @@ ZSTDLIB_API size_t ZSTD_initCStream_usingCDict_advanced(ZSTD_CStream* zcs, const /*! ZSTD_resetCStream() : * start a new compression job, using same parameters from previous job. - * This is typically useful to skip dictionary loading stage, since it will re-use it in-place.. + * This is typically useful to skip dictionary loading stage, since it will re-use it in-place. * Note that zcs must be init at least once before using ZSTD_resetCStream(). * If pledgedSrcSize is not known at reset time, use macro ZSTD_CONTENTSIZE_UNKNOWN. * If pledgedSrcSize > 0, its value must be correct, as it will be written in header, and controlled at the end. * For the time being, pledgedSrcSize==0 is interpreted as "srcSize unknown" for compatibility with older programs, * but it will change to mean "empty" in future version, so use macro ZSTD_CONTENTSIZE_UNKNOWN instead. - * @return : 0, or an error code (which can be tested using ZSTD_isError()) */ + * @return : 0, or an error code (which can be tested using ZSTD_isError()) + */ ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledgedSrcSize); @@ -755,21 +756,27 @@ typedef struct { unsigned nbActiveWorkers; /* MT only : nb of workers actively compressing at probe time */ } ZSTD_frameProgression; -/* ZSTD_getFrameProgression(): +/* ZSTD_getFrameProgression() : * tells how much data has been ingested (read from input) * consumed (input actually compressed) and produced (output) for current frame. - * Therefore, (ingested - consumed) is amount of input data buffered internally, not yet compressed. - * Can report progression inside worker threads (multi-threading and non-blocking mode). + * Note : (ingested - consumed) is amount of input data buffered internally, not yet compressed. + * Aggregates progression inside active worker threads. */ ZSTDLIB_API ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx); -/*! ZSTD_toFlushNow() +/*! ZSTD_toFlushNow() : * Tell how many bytes are ready to be flushed immediately. * Useful for multithreading scenarios (nbWorkers >= 1). - * Probe the oldest active job (not yet entirely flushed) and check its output buffer. - * If return 0, it means there is no active job, or - * it means oldest job is still active, but everything produced has been flushed so far, - * therefore flushing is limited by speed of oldest job. */ + * Probe the oldest active job, defined as oldest job not yet entirely flushed, + * and check its output buffer. + * @return : amount of data stored in oldest job and ready to be flushed immediately. + * if @return == 0, it means either : + * + there is no active job (could be checked with ZSTD_frameProgression()), or + * + oldest job is still actively compressing data, + * but everything it has produced has also been flushed so far, + * therefore flushing speed is currently limited by production speed of oldest job + * irrespective of the speed of concurrent newer jobs. + */ ZSTDLIB_API size_t ZSTD_toFlushNow(ZSTD_CCtx* cctx); diff --git a/programs/fileio.c b/programs/fileio.c index 6ea43c902..701e30e8f 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -283,7 +283,11 @@ void FIO_setOverlapLog(unsigned overlapLog){ g_overlapLog = overlapLog; } static U32 g_adaptiveMode = 0; -void FIO_setAdaptiveMode(unsigned adapt) { g_adaptiveMode = adapt; } +void FIO_setAdaptiveMode(unsigned adapt) { + if ((adapt>0) && (g_nbWorkers==0)) + EXM_THROW(1, "Adaptive mode is not compatible with single thread mode \n"); + g_adaptiveMode = adapt; +} static U32 g_ldmFlag = 0; void FIO_setLdmFlag(unsigned ldmFlag) { g_ldmFlag = (ldmFlag>0); @@ -541,7 +545,8 @@ static void FIO_freeCResources(cRess_t ress) #ifdef ZSTD_GZCOMPRESS -static unsigned long long FIO_compressGzFrame(cRess_t* ress, +static unsigned long long +FIO_compressGzFrame(cRess_t* ress, const char* srcFileName, U64 const srcFileSize, int compressionLevel, U64* readsize) { @@ -623,9 +628,10 @@ static unsigned long long FIO_compressGzFrame(cRess_t* ress, #ifdef ZSTD_LZMACOMPRESS -static unsigned long long FIO_compressLzmaFrame(cRess_t* ress, - const char* srcFileName, U64 const srcFileSize, - int compressionLevel, U64* readsize, int plain_lzma) +static unsigned long long +FIO_compressLzmaFrame(cRess_t* ress, + const char* srcFileName, U64 const srcFileSize, + int compressionLevel, U64* readsize, int plain_lzma) { unsigned long long inFileSize = 0, outFileSize = 0; lzma_stream strm = LZMA_STREAM_INIT; @@ -698,9 +704,10 @@ static unsigned long long FIO_compressLzmaFrame(cRess_t* ress, #define LZ4F_max64KB max64KB #endif static int FIO_LZ4_GetBlockSize_FromBlockId (int id) { return (1 << (8 + (2 * id))); } -static unsigned long long FIO_compressLz4Frame(cRess_t* ress, - const char* srcFileName, U64 const srcFileSize, - int compressionLevel, U64* readsize) +static unsigned long long +FIO_compressLz4Frame(cRess_t* ress, + const char* srcFileName, U64 const srcFileSize, + int compressionLevel, U64* readsize) { const size_t blockSize = FIO_LZ4_GetBlockSize_FromBlockId(LZ4F_max64KB); unsigned long long inFileSize = 0, outFileSize = 0; @@ -838,7 +845,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, /* count stats */ inputPresented++; - if (oldIPos == inBuff.pos) inputBlocked++; + if (oldIPos == inBuff.pos) inputBlocked++; /* input buffer is full and can't take any more : input speed is faster than consumption rate */ if (!toFlushNow) flushWaiting = 1; /* Write compressed stream */ @@ -846,7 +853,7 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, (U32)directive, (U32)inBuff.pos, (U32)inBuff.size, (U32)outBuff.pos); if (outBuff.pos) { size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); - if (sizeCheck!=outBuff.pos) + if (sizeCheck != outBuff.pos) EXM_THROW(25, "Write error : cannot write compressed block"); compressedfilesize += outBuff.pos; } @@ -857,24 +864,24 @@ FIO_compressZstdFrame(const cRess_t* ressPtr, double const cShare = (double)zfp.produced / (zfp.consumed + !zfp.consumed/*avoid div0*/) * 100; /* check output speed */ - if (zfp.currentJobID > 1) { - static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; + if (zfp.currentJobID > 1) { /* only possible if nbWorkers >= 1 */ + static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 }; /* note : requires fileio to run main thread */ unsigned long long newlyProduced = zfp.produced - cpszfp.produced; unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed; assert(zfp.produced >= cpszfp.produced); - - cpszfp = zfp; + assert(g_nbWorkers >= 1); if ( (zfp.ingested == cpszfp.ingested) /* no data read : input buffer full */ && (zfp.consumed == cpszfp.consumed) /* no data compressed : no more buffer to compress OR compression is really slow */ && (zfp.nbActiveWorkers == 0) /* confirmed : no compression : either no more buffer to compress OR not enough data to start first worker */ - && (zfp.currentJobID > 0) /* first job started : only remaining reason is no more available buffer to start compression */ ) { DISPLAYLEVEL(6, "all buffers full : compression stopped => slow down \n") speedChange = slower; } + cpszfp = zfp; + if ( (newlyProduced > (newlyFlushed * 9 / 8)) /* compression produces more data than output can flush (though production can be spiky, due to work unit : (N==4)*block sizes) */ && (flushWaiting == 0) /* flush speed was never slowed by lack of production, so it's operating at max capacity */ ) {