]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
joined normal streaming API with advanced one
authorYann Collet <cyan@fb.com>
Mon, 3 Dec 2018 22:22:38 +0000 (14:22 -0800)
committerYann Collet <cyan@fb.com>
Mon, 3 Dec 2018 22:22:38 +0000 (14:22 -0800)
lib/compress/zstd_compress.c
lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
lib/zstd.h
tests/fuzzer.c

index 7af7e78810b9c6a41ff56f8a852a0b7fb9e6c6df..06a3f101ecdf1939de18c0a6a77d2cfa7943a2fd 100644 (file)
@@ -3788,8 +3788,15 @@ size_t ZSTD_initCStream(ZSTD_CStream* zcs, int compressionLevel)
 
 /*======   Compression   ======*/
 
-MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity,
-                           const void* src, size_t srcSize)
+static size_t ZSTD_nextInputSizeHint(const ZSTD_CCtx* cctx)
+{
+    size_t hintInSize = cctx->inBuffTarget - cctx->inBuffPos;
+    if (hintInSize==0) hintInSize = cctx->blockSize;
+    return hintInSize;
+}
+
+static size_t ZSTD_limitCopy(void* dst, size_t dstCapacity,
+                       const void* src, size_t srcSize)
 {
     size_t const length = MIN(dstCapacity, srcSize);
     if (length) memcpy(dst, src, length);
@@ -3797,7 +3804,7 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity,
 }
 
 /** ZSTD_compressStream_generic():
- *  internal function for all *compressStream*() variants and *compress_generic()
+ *  internal function for all *compressStream*() variants
  *  non-static, because can be called from zstdmt_compress.c
  * @return : hint size for next input */
 size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
@@ -3937,19 +3944,25 @@ size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
     input->pos = ip - istart;
     output->pos = op - ostart;
     if (zcs->frameEnded) return 0;
-    {   size_t hintInSize = zcs->inBuffTarget - zcs->inBuffPos;
-        if (hintInSize==0) hintInSize = zcs->blockSize;
-        return hintInSize;
+    return ZSTD_nextInputSizeHint(zcs);
+}
+
+static size_t ZSTD_nextInputSizeHint_MTorST(const ZSTD_CCtx* cctx)
+{
+#ifdef ZSTD_MULTITHREAD
+    if (cctx->appliedParams.nbWorkers >= 1) {
+        assert(cctx->mtctx != NULL);
+        return ZSTDMT_nextInputSizeHint(cctx->mtctx);
     }
+#endif
+    return ZSTD_nextInputSizeHint(cctx);
+
 }
 
 size_t ZSTD_compressStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
-    /* check conditions */
-    if (output->pos > output->size) return ERROR(GENERIC);
-    if (input->pos  > input->size)  return ERROR(GENERIC);
-
-    return ZSTD_compressStream_generic(zcs, output, input, ZSTD_e_continue);
+    CHECK_F( ZSTD_compressStream2(zcs, output, input, ZSTD_e_continue) );
+    return ZSTD_nextInputSizeHint_MTorST(zcs);
 }
 
 
@@ -4005,6 +4018,7 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
             assert(cctx->streamStage == zcss_load);
             assert(cctx->appliedParams.nbWorkers == 0);
     }   }
+    /* end of transparent initialization stage */
 
     /* compression stage */
 #ifdef ZSTD_MULTITHREAD
@@ -4054,6 +4068,10 @@ size_t ZSTD_compress2(ZSTD_CCtx* cctx,
                                     ZSTD_e_end);
     assert(iPos == srcSize);
     if (ZSTD_isError(result)) return result;
+    if (result != 0) {  /* compression not completed, due to lack of output space */
+        assert(oPos == dstCapacity);
+        return ERROR(dstSize_tooSmall);
+    }
     return oPos;
 }
 
@@ -4064,20 +4082,20 @@ size_t ZSTD_compress2(ZSTD_CCtx* cctx,
 size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output)
 {
     ZSTD_inBuffer input = { NULL, 0, 0 };
-    if (output->pos > output->size) return ERROR(GENERIC);
-    CHECK_F( ZSTD_compressStream_generic(zcs, output, &input, ZSTD_e_flush) );
-    return zcs->outBuffContentSize - zcs->outBuffFlushedSize;  /* remaining to flush */
+    return ZSTD_compressStream2(zcs, output, &input, ZSTD_e_flush);
 }
 
 
 size_t ZSTD_endStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output)
 {
     ZSTD_inBuffer input = { NULL, 0, 0 };
-    if (output->pos > output->size) return ERROR(GENERIC);
-    CHECK_F( ZSTD_compressStream_generic(zcs, output, &input, ZSTD_e_end) );
+    size_t const remainingToFlush = ZSTD_compressStream2(zcs, output, &input, ZSTD_e_end);
+    CHECK_F( remainingToFlush );
+    if (zcs->appliedParams.nbWorkers > 0) return remainingToFlush;   /* minimal estimation */
+    /* single thread mode : attempt to calculate remaining to flush more precisely */
     {   size_t const lastBlockSize = zcs->frameEnded ? 0 : ZSTD_BLOCKHEADERSIZE;
         size_t const checksumSize = zcs->frameEnded ? 0 : zcs->appliedParams.fParams.checksumFlag * 4;
-        size_t const toFlush = zcs->outBuffContentSize - zcs->outBuffFlushedSize + lastBlockSize + checksumSize;
+        size_t const toFlush = remainingToFlush + lastBlockSize + checksumSize;
         DEBUGLOG(4, "ZSTD_endStream : remaining to flush : %u", (U32)toFlush);
         return toFlush;
     }
index 465ab1e4491959cef8f4fe70df052593e090dddd..4a9a626cc4e62ca0d4c556ac0027e9770a36dd74 100644 (file)
@@ -1844,7 +1844,9 @@ typedef struct {
  * Otherwise, we will load as many bytes as possible and instruct the caller
  * to continue as normal.
  */
-static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input) {
+static syncPoint_t
+findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuffer const input)
+{
     BYTE const* const istart = (BYTE const*)input.src + input.pos;
     U64 const primePower = mtctx->rsync.primePower;
     U64 const hitMask = mtctx->rsync.hitMask;
@@ -1908,6 +1910,13 @@ static syncPoint_t findSynchronizationPoint(ZSTDMT_CCtx const* mtctx, ZSTD_inBuf
     return syncPoint;
 }
 
+size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx)
+{
+    size_t hintInSize = mtctx->targetSectionSize - mtctx->inBuff.filled;
+    if (hintInSize==0) hintInSize = mtctx->targetSectionSize;
+    return hintInSize;
+}
+
 /** ZSTDMT_compressStream_generic() :
  *  internal use only - exposed to be invoked from zstd_compress.c
  *  assumption : output and input are valid (pos <= size)
index 666b506e890c0e66b19d4f3aeec2b20e1b62df7a..c60ec83225266e6b15bbc4865dec6a73d95fbc10 100644 (file)
@@ -60,6 +60,7 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
 ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
 ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize);  /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it will change in the future to mean "empty" */
 
+ZSTDLIB_API size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx* mtctx);
 ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
 
 ZSTDLIB_API size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output);   /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
index ff48970903e9fe3dce41447b3b1ad511927bdd39..d5bd738f79bdf475b4f824e63ea3be20b0a2acb8 100644 (file)
@@ -289,13 +289,17 @@ typedef struct ZSTD_outBuffer_s {
 *  A ZSTD_CStream object is required to track streaming operation.
 *  Use ZSTD_createCStream() and ZSTD_freeCStream() to create/release resources.
 *  ZSTD_CStream objects can be reused multiple times on consecutive compression operations.
-*  It is recommended to re-use ZSTD_CStream in situations where many streaming operations will be achieved consecutively,
-*  since it will play nicer with system's memory, by re-using already allocated memory.
-*  Use one separate ZSTD_CStream per thread for parallel execution.
+*  It is recommended to re-use ZSTD_CStream since it will play nicer with system's memory, by re-using already allocated memory.
 *
-*  Start a new compression by initializing ZSTD_CStream context.
-*  Use ZSTD_initCStream() to start a new compression operation.
-*  Use variants ZSTD_initCStream_usingDict() or ZSTD_initCStream_usingCDict() for streaming with dictionary (experimental section)
+*  For parallel execution, use one separate ZSTD_CStream per thread.
+*
+*  note : since v1.3.0, ZSTD_CStream and ZSTD_CCtx are the same thing.
+*
+*  Parameters are sticky : when starting a new compression on the same context,
+*  it will re-use the same sticky parameters as previous compression session.
+*  It's recommended to initialize the context before every usage.
+*  Use ZSTD_initCStream() to set the parameter to a selected compression level.
+*  Use advanced API (ZSTD_CCtx_setParameter(), etc.) to set more detailed parameters.
 *
 *  Use ZSTD_compressStream() as many times as necessary to consume input stream.
 *  The function will automatically update both `pos` fields within `input` and `output`.
@@ -306,10 +310,10 @@ typedef struct ZSTD_outBuffer_s {
 *  If not, the caller must make some room to receive more compressed data,
 *  typically by emptying output buffer, or allocating a new output buffer,
 *  and then present again remaining input data.
-*  @return : a size hint, preferred nb of bytes to use as input for next function call
-*            or an error code, which can be tested using ZSTD_isError().
-*            Note 1 : it's just a hint, to help latency a little, any other value will work fine.
-*            Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize()
+* @return : a size hint, preferred nb of bytes to use as input for next function call
+*           or an error code, which can be tested using ZSTD_isError().
+*           Note 1 : it's just a hint, to help latency a little, any value will work fine.
+*           Note 2 : size hint is guaranteed to be <= ZSTD_CStreamInSize()
 *
 *  At any moment, it's possible to flush whatever data might remain stuck within internal buffer,
 *  using ZSTD_flushStream(). `output->pos` will be updated.
@@ -757,21 +761,22 @@ typedef enum {
     ZSTD_e_flush=1,    /* flush any data provided so far,
                         * it creates (at least) one new block, that can be decoded immediately on reception;
                         * frame will continue: any future data can still reference previously compressed data, improving compression. */
-    ZSTD_e_end=2       /* flush any remaining data and close current frame.
-                        * any additional data starts a new frame.
-                        * each frame is independent (does not reference any content from previous frame). */
+    ZSTD_e_end=2       /* flush any remaining data _and_ close current frame.
+                        * note that frame is only closed after compressed data is fully flushed (return value == 0).
+                        * After that point, any additional data starts a new frame.
+                        * note : each frame is independent (does not reference any content from previous frame). */
 } ZSTD_EndDirective;
 
 /*! ZSTD_compressStream2() :
- *  Behave about the same as ZSTD_compressStream, with additional control on end directive.
+ *  Behaves about the same as ZSTD_compressStream, with additional control on end directive.
  *  - Compression parameters are pushed into CCtx before starting compression, using ZSTD_CCtx_set*()
  *  - Compression parameters cannot be changed once compression is started (save a list of exceptions in multi-threading mode)
  *  - outpot->pos must be <= dstCapacity, input->pos must be <= srcSize
  *  - outpot->pos and input->pos will be updated. They are guaranteed to remain below their respective limit.
- *  - In single-thread mode (default), function is blocking : it completes its job before returning to caller.
- *  - In multi-thread mode, function is non-blocking : it just acquires a copy of input, and distribute job to internal worker threads,
- *                                                     and then immediately returns, just indicating that there is some data remaining to be flushed.
- *                                                     The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
+ *  - When nbWorkers==0 (default), function is blocking : it completes its job before returning to caller.
+ *  - When nbWorkers>=1, function is non-blocking : it just acquires a copy of input, and distributes jobs to internal worker threads,
+ *                                                  and then immediately returns, just indicating that there is some data remaining to be flushed.
+ *                                                  The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte.
  *  - Exception : if the first call requests a ZSTD_e_end directive, the function delegates to ZSTD_compress2() which is always blocking.
  *  - @return provides a minimum amount of data remaining to be flushed from internal buffers
  *            or an error code, which can be tested using ZSTD_isError().
index 0afa21a3aebe8982a854acce24f74b741f2ab7fd..dd6e260c2a42caa16a13fc1f96fa88a07a07eb32 100644 (file)
@@ -233,11 +233,9 @@ static int FUZ_mallocTests_internal(unsigned seed, double compressibility, unsig
                 mallocCounter_t malcount = INIT_MALLOC_COUNTER;
                 ZSTD_customMem const cMem = { FUZ_mallocDebug, FUZ_freeDebug, &malcount };
                 ZSTD_CCtx* const cctx = ZSTD_createCCtx_advanced(cMem);
-                ZSTD_outBuffer out = { outBuffer, outSize, 0 };
-                ZSTD_inBuffer in = { inBuffer, inSize, 0 };
                 CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, compressionLevel) );
                 CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
-                while ( ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end) ) {}
+                CHECK_Z( ZSTD_compress2(cctx, outBuffer, outSize, inBuffer, inSize) );
                 ZSTD_freeCCtx(cctx);
                 DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ",
                                 nbThreads, compressionLevel);
@@ -1253,12 +1251,12 @@ static int basicUnitTests(U32 seed, double compressibility)
             if (ZSTD_isError(cSize_1pass)) goto _output_error;
 
             CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, compressionLevel) );
-            {   ZSTD_inBuffer in = { CNBuffer, srcSize, 0 };
-                ZSTD_outBuffer out = { compressedBuffer, compressedBufferSize, 0 };
-                size_t const compressionResult = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end);
-                DISPLAYLEVEL(5, "simple=%zu vs %zu=advanced : ", cSize_1pass, out.pos);
+            {   size_t const compressionResult = ZSTD_compress2(cctx,
+                                    compressedBuffer, compressedBufferSize,
+                                    CNBuffer, srcSize);
+                DISPLAYLEVEL(5, "simple=%zu vs %zu=advanced : ", cSize_1pass, compressionResult);
                 if (ZSTD_isError(compressionResult)) goto _output_error;
-                if (out.pos != cSize_1pass) goto _output_error;
+                if (compressionResult != cSize_1pass) goto _output_error;
         }   }
         ZSTD_freeCCtx(cctx);
     }
@@ -1274,13 +1272,12 @@ static int basicUnitTests(U32 seed, double compressibility)
             CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, 2) );
             CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_enableLongDistanceMatching, 1) );
             CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_windowLog, 18) );
-            {   ZSTD_inBuffer in = { CNBuffer, inputSize, 0 };
-                ZSTD_outBuffer out = { compressedBuffer, ZSTD_compressBound(inputSize), 0 };
-                size_t const result = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end);
-                if (result != 0) goto _output_error;
-                if (in.pos != in.size) goto _output_error;
-                cSize = out.pos;
-                xxh64 = XXH64(out.dst, out.pos, 0);
+            {   size_t const compressedSize = ZSTD_compress2(cctx,
+                                compressedBuffer, ZSTD_compressBound(inputSize),
+                                CNBuffer, inputSize);
+                CHECK(compressedSize);
+                cSize = compressedSize;
+                xxh64 = XXH64(compressedBuffer, compressedSize, 0);
             }
             DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)cSize);
             ZSTD_freeCCtx(cctx);
@@ -1291,14 +1288,13 @@ static int basicUnitTests(U32 seed, double compressibility)
             CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_windowLog, 18) );
             CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_enableLongDistanceMatching, 1) );
             CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, 2) );
-            {   ZSTD_inBuffer in = { CNBuffer, inputSize, 0 };
-                ZSTD_outBuffer out = { compressedBuffer, ZSTD_compressBound(inputSize), 0 };
-                size_t const result = ZSTD_compressStream2(cctx, &out, &in, ZSTD_e_end);
-                if (result != 0) goto _output_error;
-                if (in.pos != in.size) goto _output_error;
-                if (out.pos != cSize) goto _output_error;   /* must result in same compressed result, hence same size */
-                if (XXH64(out.dst, out.pos, 0) != xxh64) goto _output_error;  /* must result in exactly same content, hence same hash */
-                DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)out.pos);
+            {   size_t const result = ZSTD_compress2(cctx,
+                                compressedBuffer, ZSTD_compressBound(inputSize),
+                                CNBuffer, inputSize);
+                CHECK(result);
+                if (result != cSize) goto _output_error;   /* must result in same compressed result, hence same size */
+                if (XXH64(compressedBuffer, result, 0) != xxh64) goto _output_error;  /* must result in exactly same content, hence same hash */
+                DISPLAYLEVEL(3, "OK (compress : %u -> %u bytes)\n", (U32)inputSize, (U32)result);
             }
             ZSTD_freeCCtx(cctx);
         }