From: Yann Collet Date: Thu, 28 Jul 2016 17:55:09 +0000 (+0200) Subject: zbuff uses ZSTD_compressEnd() X-Git-Tag: v0.8.0^2~22 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=60ba31c5703deee1df6b22024001add9b65bf3d4;p=thirdparty%2Fzstd.git zbuff uses ZSTD_compressEnd() --- diff --git a/lib/common/zbuff.h b/lib/common/zbuff.h index b31c3e631..8e6305e64 100644 --- a/lib/common/zbuff.h +++ b/lib/common/zbuff.h @@ -137,8 +137,8 @@ ZSTDLIB_API size_t ZBUFF_decompressContinue(ZBUFF_DCtx* dctx, * The function will report how many bytes were read or written by modifying *srcSizePtr and *dstCapacityPtr. * Note that it may not consume the entire input, in which case it's up to the caller to present remaining input again. * The content of `dst` will be overwritten (up to *dstCapacityPtr) at each function call, so save its content if it matters, or change `dst`. -* @return : a hint to preferred nb of bytes to use as input for next function call (it's only a hint, to help latency), -* or 0 when a frame is completely decoded, +* @return : 0 when a frame is completely decoded and fully flushed, + >0 when decoding is not finished, with value being a suggested next input size (it's just a hint, tends to help latency), * or an error code, which can be tested using ZBUFF_isError(). * * Hint : recommended buffer sizes (not compulsory) : ZBUFF_recommendedDInSize() and ZBUFF_recommendedDOutSize() diff --git a/lib/compress/zbuff_compress.c b/lib/compress/zbuff_compress.c index 25a142203..5d9291857 100644 --- a/lib/compress/zbuff_compress.c +++ b/lib/compress/zbuff_compress.c @@ -46,7 +46,7 @@ static size_t const ZBUFF_endFrameSize = ZSTD_BLOCKHEADERSIZE; -/*_************************************************** +/*-*********************************************************** * Streaming compression * * A ZBUFF_CCtx object is required to track streaming operation. @@ -77,7 +77,7 @@ static size_t const ZBUFF_endFrameSize = ZSTD_BLOCKHEADERSIZE; * Hint : recommended buffer sizes (not compulsory) * input : ZSTD_BLOCKSIZE_MAX (128 KB), internal unit size, it improves latency to use this value. * output : ZSTD_compressBound(ZSTD_BLOCKSIZE_MAX) + ZSTD_blockHeaderSize + ZBUFF_endFrameSize : ensures it's always possible to write/flush/end a full block at best speed. -* **************************************************/ +* ***********************************************************/ typedef enum { ZBUFFcs_init, ZBUFFcs_load, ZBUFFcs_flush, ZBUFFcs_final } ZBUFF_cStage; @@ -96,6 +96,7 @@ struct ZBUFF_CCtx_s { size_t outBuffFlushedSize; ZBUFF_cStage stage; U32 checksum; + U32 frameEnded; ZSTD_customMem customMem; }; /* typedef'd tp ZBUFF_CCtx within "zstd_buffered.h" */ @@ -166,6 +167,7 @@ size_t ZBUFF_compressInit_advanced(ZBUFF_CCtx* zbc, zbc->outBuffContentSize = zbc->outBuffFlushedSize = 0; zbc->stage = ZBUFFcs_load; zbc->checksum = params.fParams.checksumFlag > 0; + zbc->frameEnded = 0; return 0; /* ready to go */ } @@ -198,7 +200,7 @@ typedef enum { zbf_gather, zbf_flush, zbf_end } ZBUFF_flush_e; static size_t ZBUFF_compressContinue_generic(ZBUFF_CCtx* zbc, void* dst, size_t* dstCapacityPtr, const void* src, size_t* srcSizePtr, - ZBUFF_flush_e flush) + ZBUFF_flush_e const flush) { U32 someMoreWork = 1; const char* const istart = (const char*)src; @@ -231,8 +233,11 @@ static size_t ZBUFF_compressContinue_generic(ZBUFF_CCtx* zbc, cDst = op; /* compress directly into output buffer (avoid flush stage) */ else cDst = zbc->outBuff, oSize = zbc->outBuffSize; - cSize = ZSTD_compressContinue(zbc->zc, cDst, oSize, zbc->inBuff + zbc->inToCompress, iSize); + cSize = (flush == zbf_end) ? + ZSTD_compressEnd(zbc->zc, cDst, oSize, zbc->inBuff + zbc->inToCompress, iSize) : + ZSTD_compressContinue(zbc->zc, cDst, oSize, zbc->inBuff + zbc->inToCompress, iSize); if (ZSTD_isError(cSize)) return cSize; + if (flush == zbf_end) zbc->frameEnded = 1; /* prepare next block */ zbc->inBuffTarget = zbc->inBuffPos + zbc->blockSize; if (zbc->inBuffTarget > zbc->inBuffSize) @@ -266,6 +271,7 @@ static size_t ZBUFF_compressContinue_generic(ZBUFF_CCtx* zbc, *srcSizePtr = ip - istart; *dstCapacityPtr = op - ostart; + if (zbc->frameEnded) return 0; { size_t hintInSize = zbc->inBuffTarget - zbc->inBuffPos; if (hintInSize==0) hintInSize = zbc->blockSize; return hintInSize; @@ -301,16 +307,17 @@ size_t ZBUFF_compressEnd(ZBUFF_CCtx* zbc, void* dst, size_t* dstCapacityPtr) /* flush whatever remains */ size_t outSize = *dstCapacityPtr; size_t srcSize = 0; - size_t const uselessHint = ZBUFF_compressContinue_generic(zbc, dst, &outSize, &srcSize, &srcSize, zbf_end); /* use a valid address instead of NULL */ + size_t const notEnded = ZBUFF_compressContinue_generic(zbc, dst, &outSize, &srcSize, &srcSize, zbf_end); /* use a valid address instead of NULL */ size_t const remainingToFlush = zbc->outBuffContentSize - zbc->outBuffFlushedSize; - op += outSize; (void)uselessHint; + op += outSize; if (remainingToFlush) { *dstCapacityPtr = op-ostart; return remainingToFlush + ZBUFF_endFrameSize + (zbc->checksum * 4); } /* create epilogue */ zbc->stage = ZBUFFcs_final; - zbc->outBuffContentSize = ZSTD_compressEnd(zbc->zc, zbc->outBuff, zbc->outBuffSize, NULL, 0); /* epilogue into outBuff */ + zbc->outBuffContentSize = !notEnded ? 0 : + ZSTD_compressEnd(zbc->zc, zbc->outBuff, zbc->outBuffSize, NULL, 0); /* write epilogue into outBuff */ } /* flush epilogue */ diff --git a/lib/decompress/zbuff_decompress.c b/lib/decompress/zbuff_decompress.c index 3c9ada6f0..b22bd84a6 100644 --- a/lib/decompress/zbuff_decompress.c +++ b/lib/decompress/zbuff_decompress.c @@ -158,9 +158,9 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, char* const ostart = (char*)dst; char* const oend = ostart + *dstCapacityPtr; char* op = ostart; - U32 notDone = 1; + U32 someMoreWork = 1; - while (notDone) { + while (someMoreWork) { switch(zbd->stage) { case ZBUFFds_init : @@ -168,9 +168,9 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, case ZBUFFds_loadHeader : { size_t const hSize = ZSTD_getFrameParams(&(zbd->fParams), zbd->headerBuffer, zbd->lhSize); - if (hSize != 0) { + if (ZSTD_isError(hSize)) return hSize; + if (hSize != 0) { /* need more input */ size_t const toLoad = hSize - zbd->lhSize; /* if hSize!=0, hSize > zbd->lhSize */ - if (ZSTD_isError(hSize)) return hSize; if (toLoad > (size_t)(iend-ip)) { /* not enough input to load full header */ memcpy(zbd->headerBuffer + zbd->lhSize, ip, iend-ip); zbd->lhSize += iend-ip; @@ -184,7 +184,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, /* Consume header */ { size_t const h1Size = ZSTD_nextSrcSizeToDecompress(zbd->zd); /* == ZSTD_frameHeaderSize_min */ size_t const h1Result = ZSTD_decompressContinue(zbd->zd, NULL, 0, zbd->headerBuffer, h1Size); - if (ZSTD_isError(h1Result)) return h1Result; + if (ZSTD_isError(h1Result)) return h1Result; /* should not happen : already checked */ if (h1Size < zbd->lhSize) { /* long header */ size_t const h2Size = ZSTD_nextSrcSizeToDecompress(zbd->zd); size_t const h2Result = ZSTD_decompressContinue(zbd->zd, NULL, 0, zbd->headerBuffer+h1Size, h2Size); @@ -195,6 +195,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, /* Frame header instruct buffer sizes */ { size_t const blockSize = MIN(zbd->fParams.windowSize, ZSTD_BLOCKSIZE_ABSOLUTEMAX); + size_t const neededOutSize = zbd->fParams.windowSize + blockSize; zbd->blockSize = blockSize; if (zbd->inBuffSize < blockSize) { zbd->customMem.customFree(zbd->customMem.opaque, zbd->inBuff); @@ -202,20 +203,20 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, zbd->inBuff = (char*)zbd->customMem.customAlloc(zbd->customMem.opaque, blockSize); if (zbd->inBuff == NULL) return ERROR(memory_allocation); } - { size_t const neededOutSize = zbd->fParams.windowSize + blockSize; - if (zbd->outBuffSize < neededOutSize) { - zbd->customMem.customFree(zbd->customMem.opaque, zbd->outBuff); - zbd->outBuffSize = neededOutSize; - zbd->outBuff = (char*)zbd->customMem.customAlloc(zbd->customMem.opaque, neededOutSize); - if (zbd->outBuff == NULL) return ERROR(memory_allocation); - } } } + if (zbd->outBuffSize < neededOutSize) { + zbd->customMem.customFree(zbd->customMem.opaque, zbd->outBuff); + zbd->outBuffSize = neededOutSize; + zbd->outBuff = (char*)zbd->customMem.customAlloc(zbd->customMem.opaque, neededOutSize); + if (zbd->outBuff == NULL) return ERROR(memory_allocation); + } } zbd->stage = ZBUFFds_read; + /* pass-through */ case ZBUFFds_read: { size_t const neededInSize = ZSTD_nextSrcSizeToDecompress(zbd->zd); if (neededInSize==0) { /* end of frame */ zbd->stage = ZBUFFds_init; - notDone = 0; + someMoreWork = 0; break; } if ((size_t)(iend-ip) >= neededInSize) { /* decode directly from src */ @@ -230,8 +231,9 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, zbd->stage = ZBUFFds_flush; break; } - if (ip==iend) { notDone = 0; break; } /* no more input */ + if (ip==iend) { someMoreWork = 0; break; } /* no more input */ zbd->stage = ZBUFFds_load; + /* pass-through */ } case ZBUFFds_load: @@ -242,7 +244,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, loadedSize = ZBUFF_limitCopy(zbd->inBuff + zbd->inPos, toLoad, ip, iend-ip); ip += loadedSize; zbd->inPos += loadedSize; - if (loadedSize < toLoad) { notDone = 0; break; } /* not enough input, wait for more */ + if (loadedSize < toLoad) { someMoreWork = 0; break; } /* not enough input, wait for more */ /* decode loaded input */ { const int isSkipFrame = ZSTD_isSkipFrame(zbd->zd); @@ -254,7 +256,7 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, if (!decodedSize && !isSkipFrame) { zbd->stage = ZBUFFds_read; break; } /* this was just a header */ zbd->outEnd = zbd->outStart + decodedSize; zbd->stage = ZBUFFds_flush; - // break; /* ZBUFFds_flush follows */ + /* pass-through */ } } case ZBUFFds_flush: @@ -262,14 +264,14 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, size_t const flushedSize = ZBUFF_limitCopy(op, oend-op, zbd->outBuff + zbd->outStart, toFlushSize); op += flushedSize; zbd->outStart += flushedSize; - if (flushedSize == toFlushSize) { + if (flushedSize == toFlushSize) { /* flush completed */ zbd->stage = ZBUFFds_read; if (zbd->outStart + zbd->blockSize > zbd->outBuffSize) zbd->outStart = zbd->outEnd = 0; break; } /* cannot flush everything */ - notDone = 0; + someMoreWork = 0; break; } default: return ERROR(GENERIC); /* impossible */ @@ -279,13 +281,15 @@ size_t ZBUFF_decompressContinue(ZBUFF_DCtx* zbd, *srcSizePtr = ip-istart; *dstCapacityPtr = op-ostart; { size_t nextSrcSizeHint = ZSTD_nextSrcSizeToDecompress(zbd->zd); + if (!nextSrcSizeHint) return (zbd->outEnd != zbd->outStart); /* return 0 only if fully flushed too */ + if (nextSrcSizeHint > 4) nextSrcSizeHint += ZSTD_blockHeaderSize; + if (zbd->inPos > nextSrcSizeHint) return ERROR(GENERIC); /* should never happen */ nextSrcSizeHint -= zbd->inPos; /* already loaded*/ return nextSrcSizeHint; } } - /* ************************************* * Tool functions ***************************************/ diff --git a/lib/decompress/zstd_decompress.c b/lib/decompress/zstd_decompress.c index 60f7568d1..5aa43790a 100644 --- a/lib/decompress/zstd_decompress.c +++ b/lib/decompress/zstd_decompress.c @@ -979,18 +979,12 @@ size_t ZSTD_decompress(void* dst, size_t dstCapacity, const void* src, size_t sr } -/*_****************************** -* Streaming Decompression API -********************************/ -size_t ZSTD_nextSrcSizeToDecompress(ZSTD_DCtx* dctx) -{ - return dctx->expected; -} +/*-********************************** +* Streaming Decompression API +************************************/ +size_t ZSTD_nextSrcSizeToDecompress(ZSTD_DCtx* dctx) { return dctx->expected; } -int ZSTD_isSkipFrame(ZSTD_DCtx* dctx) -{ - return dctx->stage == ZSTDds_skipFrame; -} +int ZSTD_isSkipFrame(ZSTD_DCtx* dctx) { return dctx->stage == ZSTDds_skipFrame; } /* for zbuff */ /** ZSTD_decompressContinue() : * @return : nb of bytes generated into `dst` (necessarily <= `dstCapacity) @@ -1079,11 +1073,12 @@ size_t ZSTD_decompressContinue(ZSTD_DCtx* dctx, void* dst, size_t dstCapacity, c if (dctx->stage == ZSTDds_decompressLastBlock) { /* end of frame */ if (dctx->fParams.checksumFlag) { /* another round for frame checksum */ - dctx->expected = 0; + dctx->expected = 4; dctx->stage = ZSTDds_checkChecksum; + } else { + dctx->expected = 0; /* ends here */ + dctx->stage = ZSTDds_getFrameHeaderSize; } - dctx->expected = 0; /* ends here */ - dctx->stage = ZSTDds_getFrameHeaderSize; } else { dctx->stage = ZSTDds_decodeBlockHeader; dctx->expected = ZSTD_blockHeaderSize; diff --git a/lib/zstd.h b/lib/zstd.h index e3663e8b7..01ac7d268 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -316,6 +316,7 @@ ZSTDLIB_API size_t ZSTD_sizeofDCtx(const ZSTD_DCtx* dctx); * But it's also a complex one, with a lot of restrictions (documented below). * For an easier streaming API, look into common/zbuff.h * which removes all restrictions by allocating and managing its own internal buffer */ + ZSTDLIB_API size_t ZSTD_compressBegin(ZSTD_CCtx* cctx, int compressionLevel); ZSTDLIB_API size_t ZSTD_compressBegin_usingDict(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, int compressionLevel); ZSTDLIB_API size_t ZSTD_compressBegin_advanced(ZSTD_CCtx* cctx, const void* dict, size_t dictSize, ZSTD_parameters params, unsigned long long pledgedSrcSize); @@ -360,7 +361,7 @@ typedef struct { unsigned checksumFlag; } ZSTD_frameParams; -ZSTDLIB_API size_t ZSTD_getFrameParams(ZSTD_frameParams* fparamsPtr, const void* src, size_t srcSize); /**< doesn't consume input */ +ZSTDLIB_API size_t ZSTD_getFrameParams(ZSTD_frameParams* fparamsPtr, const void* src, size_t srcSize); /**< doesn't consume input, see details below */ ZSTDLIB_API size_t ZSTD_decompressBegin(ZSTD_DCtx* dctx); ZSTDLIB_API size_t ZSTD_decompressBegin_usingDict(ZSTD_DCtx* dctx, const void* dict, size_t dictSize); diff --git a/programs/datagencli.c b/programs/datagencli.c index d437d5cb3..c4fa7f73b 100644 --- a/programs/datagencli.c +++ b/programs/datagencli.c @@ -39,7 +39,7 @@ #define MB *(1 <<20) #define GB *(1U<<30) -#define SIZE_DEFAULT (64 KB) +#define SIZE_DEFAULT ((64 KB) + 1) #define SEED_DEFAULT 0 #define COMPRESSIBILITY_DEFAULT 50 @@ -72,15 +72,13 @@ static int usage(const char* programName) int main(int argc, const char** argv) { - int argNb; double proba = (double)COMPRESSIBILITY_DEFAULT / 100; double litProba = 0.0; U64 size = SIZE_DEFAULT; U32 seed = SEED_DEFAULT; - const char* programName; + const char* const programName = argv[0]; - /* Check command line */ - programName = argv[0]; + int argNb; for(argNb=1; argNb>20) ); if (toRead == 0) break; /* end of frame */ - if (readSize) EXM_THROW(38, "Decoding error : should consume entire input"); + if (readSize) EXM_THROW(37, "Decoding error : should consume entire input"); /* Fill input buffer */ - if (toRead > ress.srcBufferSize) EXM_THROW(34, "too large block"); + if (toRead > ress.srcBufferSize) EXM_THROW(38, "too large block"); readSize = fread(ress.srcBuffer, 1, toRead, finput); - if (readSize != toRead) - EXM_THROW(35, "Read error"); + if (readSize == 0) EXM_THROW(39, "Read error : premature end"); } FIO_fwriteSparseEnd(foutput, storedSkips); @@ -710,8 +709,8 @@ static int FIO_decompressSrcFile(dRess_t ress, const char* srcFileName) continue; } #endif - if (((magic & 0xFFFFFFF0U) != ZSTD_MAGIC_SKIPPABLE_START) && (magic != ZSTD_MAGICNUMBER)) { - if (g_overwrite) { /* -df : pass-through mode */ + if (((magic & 0xFFFFFFF0U) != ZSTD_MAGIC_SKIPPABLE_START) & (magic != ZSTD_MAGICNUMBER)) { + if ((g_overwrite) && !strcmp (srcFileName, stdinmark)) { /* pass-through mode */ unsigned const result = FIO_passThrough(dstFile, srcFile, ress.srcBuffer, ress.srcBufferSize); if (fclose(srcFile)) EXM_THROW(32, "zstd: %s close error", srcFileName); /* error should never happen */ return result; diff --git a/programs/playTests.sh b/programs/playTests.sh index ef44f7bdf..05eb49fce 100755 --- a/programs/playTests.sh +++ b/programs/playTests.sh @@ -142,8 +142,8 @@ $ECHO "\n**** multiple files tests **** " ./datagen -s1 > tmp1 2> $INTOVOID ./datagen -s2 -g100K > tmp2 2> $INTOVOID ./datagen -s3 -g1M > tmp3 2> $INTOVOID -$ZSTD -f tmp* $ECHO "compress tmp* : " +$ZSTD -f tmp* ls -ls tmp* rm tmp1 tmp2 tmp3 $ECHO "decompress tmp* : " diff --git a/programs/zbufftest.c b/programs/zbufftest.c index 3e36d015f..ce6beb246 100644 --- a/programs/zbufftest.c +++ b/programs/zbufftest.c @@ -424,23 +424,22 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres U32 const enoughDstSize = dstBuffSize >= remainingToFlush; remainingToFlush = ZBUFF_compressEnd(zc, cBuffer+cSize, &dstBuffSize); CHECK (ZBUFF_isError(remainingToFlush), "flush error : %s", ZBUFF_getErrorName(remainingToFlush)); - //DISPLAY("flush %u bytes : still within context : %i \n", (U32)dstBuffSize, (int)remainingToFlush); - CHECK (enoughDstSize && remainingToFlush, "ZBUFF_compressEnd() not fully flushed, but enough space available"); + CHECK (enoughDstSize && remainingToFlush, "ZBUFF_compressEnd() not fully flushed (%u remaining), but enough space available", (U32)remainingToFlush); cSize += dstBuffSize; } } crcOrig = XXH64_digest(&xxhState); /* multi - fragments decompression test */ ZBUFF_decompressInitDictionary(zd, dict, dictSize); - for (totalCSize = 0, totalGenSize = 0 ; totalCSize < cSize ; ) { + errorCode = 1; + for (totalCSize = 0, totalGenSize = 0 ; errorCode ; ) { size_t readCSrcSize = FUZ_randomLength(&lseed, maxSampleLog); size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog); size_t dstBuffSize = MIN(dstBufferSize - totalGenSize, randomDstSize); - size_t const decompressError = ZBUFF_decompressContinue(zd, dstBuffer+totalGenSize, &dstBuffSize, cBuffer+totalCSize, &readCSrcSize); - CHECK (ZBUFF_isError(decompressError), "decompression error : %s", ZBUFF_getErrorName(decompressError)); + errorCode = ZBUFF_decompressContinue(zd, dstBuffer+totalGenSize, &dstBuffSize, cBuffer+totalCSize, &readCSrcSize); + CHECK (ZBUFF_isError(errorCode), "decompression error : %s", ZBUFF_getErrorName(errorCode)); totalGenSize += dstBuffSize; totalCSize += readCSrcSize; - errorCode = decompressError; /* needed for != 0 last test */ } CHECK (errorCode != 0, "frame not fully decoded"); CHECK (totalGenSize != totalTestSize, "decompressed data : wrong size") diff --git a/programs/zstd.1 b/programs/zstd.1 index 9b6be9349..d2dfc3c14 100644 --- a/programs/zstd.1 +++ b/programs/zstd.1 @@ -80,7 +80,8 @@ It also features a very fast decoder, with speed > 500 MB/s per core. verbose mode .TP .BR \-q ", " --quiet - suppress warnings and notifications; specify twice to suppress errors too + suppress warnings, interactivity and notifications. + specify twice to suppress errors too. .TP .BR \-C ", " --check add integrity check computed from uncompressed data