From: Yann Collet Date: Wed, 28 Jun 2017 18:09:43 +0000 (-0700) Subject: fixed ZSTD_refPrefix with Multithread-enabled CCtx X-Git-Tag: v1.3.0~1^2~10^2~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=33a66390398334dc6a84e6b334194f226986e306;p=thirdparty%2Fzstd.git fixed ZSTD_refPrefix with Multithread-enabled CCtx --- diff --git a/lib/common/zstd_internal.h b/lib/common/zstd_internal.h index bcf93e58d..b2c7cfcdd 100644 --- a/lib/common/zstd_internal.h +++ b/lib/common/zstd_internal.h @@ -71,12 +71,17 @@ #if defined(ZSTD_DEBUG) && (ZSTD_DEBUG>=2) # include - static unsigned g_debugLevel = ZSTD_DEBUG; -# define DEBUGLOG(l, ...) { \ - if (l<=g_debugLevel) { \ - fprintf(stderr, __FILE__ ": "); \ - fprintf(stderr, __VA_ARGS__); \ - fprintf(stderr, " \n"); \ +/* recommended values for ZSTD_DEBUG display levels : + * 2 : reserved for currently active debugging path + * 3 : events once per object lifetime (CCtx, CDict) + * 4 : events once per frame + * 5 : events once per block + * 6 : events once per sequence (*very* verbose) */ +# define DEBUGLOG(l, ...) { \ + if (l<=ZSTD_DEBUG) { \ + fprintf(stderr, __FILE__ ": "); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, " \n"); \ } } #else # define DEBUGLOG(l, ...) {} /* disabled */ diff --git a/lib/compress/zstd_compress.c b/lib/compress/zstd_compress.c index 41be22edd..9ea866bf2 100644 --- a/lib/compress/zstd_compress.c +++ b/lib/compress/zstd_compress.c @@ -3924,29 +3924,30 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx, if (cctx->streamStage == zcss_init) { /* transparent reset */ + const void* const prefix = cctx->prefix; + size_t const prefixSize = cctx->prefixSize; ZSTD_parameters params = cctx->requestedParams; if (cctx->compressionLevel != ZSTD_CLEVEL_CUSTOM) params.cParams = ZSTD_getCParams(cctx->compressionLevel, - cctx->pledgedSrcSizePlusOne-1, 0 /* dictSize */); + cctx->pledgedSrcSizePlusOne-1, 0 /*dictSize*/); + cctx->prefix = NULL; cctx->prefixSize = 0; /* single usage */ + assert(prefix==NULL || cctx->cdict==NULL); /* only one can be set */ #ifdef ZSTD_MULTITHREAD if (cctx->nbThreads > 1) { - DEBUGLOG(4, "call ZSTDMT_initCStream_internal"); - CHECK_F( ZSTDMT_initCStream_internal(cctx->mtctx, NULL, 0, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); + DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbThreads=%u", cctx->nbThreads); + CHECK_F( ZSTDMT_initCStream_internal(cctx->mtctx, prefix, prefixSize, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->streamStage = zcss_load; } else #endif { - const void* const prefix = cctx->prefix; - size_t const prefixSize = cctx->prefixSize; - cctx->prefix = NULL; cctx->prefixSize = 0; /* single usage */ CHECK_F( ZSTD_resetCStream_internal(cctx, prefix, prefixSize, cctx->dictMode, cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); } } #ifdef ZSTD_MULTITHREAD if (cctx->nbThreads > 1) { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); - DEBUGLOG(4, "ZSTDMT_compressStream_generic : %u", (U32)flushMin); + DEBUGLOG(5, "ZSTDMT_compressStream_generic : %u", (U32)flushMin); if ( ZSTD_isError(flushMin) || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ ZSTD_startNewCompression(cctx); diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 8a167ce1d..d5f08c76f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -586,7 +586,7 @@ size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs, if (dict) { ZSTD_freeCDict(zcs->cdictLocal); zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, - 0 /* byRef */, ZSTD_dm_auto, + 0 /* byRef */, ZSTD_dm_auto, /* note : a loadPrefix becomes an internal CDict */ params.cParams, zcs->cMem); zcs->cdict = zcs->cdictLocal; if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); diff --git a/lib/decompress/zstd_decompress.c b/lib/decompress/zstd_decompress.c index 61f766e21..a4844c5b3 100644 --- a/lib/decompress/zstd_decompress.c +++ b/lib/decompress/zstd_decompress.c @@ -750,6 +750,7 @@ size_t ZSTD_decodeSeqHeaders(ZSTD_DCtx* dctx, int* nbSeqPtr, const BYTE* const istart = (const BYTE* const)src; const BYTE* const iend = istart + srcSize; const BYTE* ip = istart; + DEBUGLOG(5, "ZSTD_decodeSeqHeaders"); /* check */ if (srcSize < MIN_SEQUENCES_SIZE) return ERROR(srcSize_wrong); @@ -930,12 +931,18 @@ static seq_t ZSTD_decodeSequence(seqState_t* seqState) seq.offset = offset; } - seq.matchLength = ML_base[mlCode] + ((mlCode>31) ? BIT_readBitsFast(&seqState->DStream, mlBits) : 0); /* <= 16 bits */ + seq.matchLength = ML_base[mlCode] + + ((mlCode>31) ? BIT_readBitsFast(&seqState->DStream, mlBits) : 0); /* <= 16 bits */ if (MEM_32bits() && (mlBits+llBits>24)) BIT_reloadDStream(&seqState->DStream); - seq.litLength = LL_base[llCode] + ((llCode>15) ? BIT_readBitsFast(&seqState->DStream, llBits) : 0); /* <= 16 bits */ - if (MEM_32bits() || - (totalBits > 64 - 7 - (LLFSELog+MLFSELog+OffFSELog)) ) BIT_reloadDStream(&seqState->DStream); + seq.litLength = LL_base[llCode] + + ((llCode>15) ? BIT_readBitsFast(&seqState->DStream, llBits) : 0); /* <= 16 bits */ + if ( MEM_32bits() + || (totalBits > 64 - 7 - (LLFSELog+MLFSELog+OffFSELog)) ) + BIT_reloadDStream(&seqState->DStream); + + DEBUGLOG(6, "seq: litL=%u, matchL=%u, offset=%u", + (U32)seq.litLength, (U32)seq.matchLength, (U32)seq.offset); /* ANS state update */ FSE_updateState(&seqState->stateLL, &seqState->DStream); /* <= 9 bits */ @@ -975,7 +982,8 @@ size_t ZSTD_execSequence(BYTE* op, /* copy Match */ if (sequence.offset > (size_t)(oLitEnd - base)) { /* offset beyond prefix -> go into extDict */ - if (sequence.offset > (size_t)(oLitEnd - vBase)) return ERROR(corruption_detected); + if (sequence.offset > (size_t)(oLitEnd - vBase)) + return ERROR(corruption_detected); match = dictEnd + (match - base); if (match + sequence.matchLength <= dictEnd) { memmove(oLitEnd, match, sequence.matchLength); @@ -1043,9 +1051,12 @@ static size_t ZSTD_decompressSequences( const BYTE* const vBase = (const BYTE*) (dctx->vBase); const BYTE* const dictEnd = (const BYTE*) (dctx->dictEnd); int nbSeq; + DEBUGLOG(5, "ZSTD_decompressSequences"); /* Build Decoding Tables */ { size_t const seqHSize = ZSTD_decodeSeqHeaders(dctx, &nbSeq, ip, seqSize); + DEBUGLOG(5, "ZSTD_decodeSeqHeaders: size=%u, nbSeq=%i", + (U32)seqHSize, nbSeq); if (ZSTD_isError(seqHSize)) return seqHSize; ip += seqHSize; } @@ -1064,11 +1075,13 @@ static size_t ZSTD_decompressSequences( nbSeq--; { seq_t const sequence = ZSTD_decodeSequence(&seqState); size_t const oneSeqSize = ZSTD_execSequence(op, oend, sequence, &litPtr, litEnd, base, vBase, dictEnd); + DEBUGLOG(6, "regenerated sequence size : %u", (U32)oneSeqSize); if (ZSTD_isError(oneSeqSize)) return oneSeqSize; op += oneSeqSize; } } /* check if reached exact end */ + DEBUGLOG(5, "after decode loop, remaining nbSeq : %i", nbSeq); if (nbSeq) return ERROR(corruption_detected); /* save reps for next block */ { U32 i; for (i=0; ientropy.rep[i] = (U32)(seqState.prevOffset[i]); } @@ -1355,11 +1368,13 @@ static size_t ZSTD_decompressBlock_internal(ZSTD_DCtx* dctx, const void* src, size_t srcSize) { /* blockType == blockCompressed */ const BYTE* ip = (const BYTE*)src; + DEBUGLOG(5, "ZSTD_decompressBlock_internal"); if (srcSize >= ZSTD_BLOCKSIZE_MAX) return ERROR(srcSize_wrong); /* Decode literals section */ { size_t const litCSize = ZSTD_decodeLiteralsBlock(dctx, src, srcSize); + DEBUGLOG(5, "ZSTD_decodeLiteralsBlock : %u", (U32)litCSize); if (ZSTD_isError(litCSize)) return litCSize; ip += litCSize; srcSize -= litCSize; @@ -1697,6 +1712,7 @@ static int ZSTD_isSkipFrame(ZSTD_DCtx* dctx) { return dctx->stage == ZSTDds_skip * or an error code, which can be tested using ZSTD_isError() */ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize) { + DEBUGLOG(5, "ZSTD_decompressContinue"); /* Sanity check */ if (srcSize != dctx->expected) return ERROR(srcSize_wrong); /* unauthorized */ if (dstCapacity) ZSTD_checkContinuity(dctx, dst); @@ -1756,10 +1772,12 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c } case ZSTDds_decompressLastBlock: case ZSTDds_decompressBlock: + DEBUGLOG(5, "case ZSTDds_decompressBlock"); { size_t rSize; switch(dctx->bType) { case bt_compressed: + DEBUGLOG(5, "case bt_compressed"); rSize = ZSTD_decompressBlock_internal(dctx, dst, dstCapacity, src, srcSize); break; case bt_raw : @@ -2323,7 +2341,7 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB } } /* Consume header (see ZSTDds_decodeFrameHeader) */ - DEBUGLOG(5, "Consume header"); + DEBUGLOG(4, "Consume header"); CHECK_F(ZSTD_decompressBegin_usingDDict(zds, zds->ddict)); if ((MEM_readLE32(zds->headerBuffer) & 0xFFFFFFF0U) == ZSTD_MAGIC_SKIPPABLE_START) { /* skippable frame */ @@ -2336,7 +2354,7 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB } /* control buffer memory usage */ - DEBUGLOG(5, "Control max buffer memory usage"); + DEBUGLOG(4, "Control max buffer memory usage"); zds->fParams.windowSize = MAX(zds->fParams.windowSize, 1U << ZSTD_WINDOWLOG_ABSOLUTEMIN); if (zds->fParams.windowSize > zds->maxWindowSize) return ERROR(frameParameter_windowTooLarge); @@ -2346,12 +2364,12 @@ size_t ZSTD_decompressStream(ZSTD_DStream* zds, ZSTD_outBuffer* output, ZSTD_inB zds->blockSize = blockSize; if ((zds->inBuffSize < blockSize) || (zds->outBuffSize < neededOutSize)) { size_t const bufferSize = blockSize + neededOutSize; - DEBUGLOG(5, "inBuff : from %u to %u", + DEBUGLOG(4, "inBuff : from %u to %u", (U32)zds->inBuffSize, (U32)blockSize); - DEBUGLOG(5, "outBuff : from %u to %u", + DEBUGLOG(4, "outBuff : from %u to %u", (U32)zds->outBuffSize, (U32)neededOutSize); if (zds->staticSize) { /* static DCtx */ - DEBUGLOG(5, "staticSize : %u", (U32)zds->staticSize); + DEBUGLOG(4, "staticSize : %u", (U32)zds->staticSize); assert(zds->staticSize >= sizeof(ZSTD_DCtx)); /* controlled at init */ if (bufferSize > zds->staticSize - sizeof(ZSTD_DCtx)) return ERROR(memory_allocation); diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 4c4479557..e4d40c66a 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1382,8 +1382,10 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double /* multi - fragments decompression test */ if (!dictSize /* don't reset if dictionary : could be different */ && (FUZ_rand(&lseed) & 1)) { + DISPLAYLEVEL(5, "resetting DCtx (dict:%08X) \n", (U32)(size_t)dict); CHECK_Z( ZSTD_resetDStream(zd) ); } else { + DISPLAYLEVEL(5, "using dict of size %u \n", (U32)dictSize); CHECK_Z( ZSTD_initDStream_usingDict(zd, dict, dictSize) ); } { size_t decompressionResult = 1; @@ -1395,7 +1397,8 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double size_t const dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); inBuff.size = inBuff.pos + readCSrcSize; outBuff.size = inBuff.pos + dstBuffSize; - DISPLAYLEVEL(5, "ZSTD_decompressStream input %u bytes \n", (U32)readCSrcSize); + DISPLAYLEVEL(5, "ZSTD_decompressStream input %u bytes (pos:%u/%u)\n", + (U32)readCSrcSize, (U32)inBuff.pos, (U32)cSize); decompressionResult = ZSTD_decompressStream(zd, &outBuff, &inBuff); CHECK (ZSTD_isError(decompressionResult), "decompression error : %s", ZSTD_getErrorName(decompressionResult)); DISPLAYLEVEL(5, "inBuff.pos = %u \n", (U32)readCSrcSize);