]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[libzstd] Fix ZSTD_compress2() for multithreaded compression
authorNick Terrell <terrelln@fb.com>
Tue, 9 Apr 2019 23:24:17 +0000 (16:24 -0700)
committerNick Terrell <terrelln@fb.com>
Tue, 9 Apr 2019 23:24:17 +0000 (16:24 -0700)
`ZSTD_compress2()` wouldn't wait for multithreaded compression to
finish. We didn't find this because ZSTDMT will block when it can
compress all in one go, but it can't do that if it doesn't have enough
output space, or if `ZSTD_c_rsyncable` is enabled.

Since we will already sometimes block when using `ZSTD_e_end`, I've
changed `ZSTD_e_end` and `ZSTD_e_flush` to guarantee maximum forward
progress. This simplifies the API, and helps users avoid the easy bug
that was made in `ZSTD_compress2()`

* Found by the libfuzzer fuzzers.
* Added a test case that catches the problem.
* I will make the fuzzers sometimes allocate less than
  `ZSTD_compressBound()` output space.

lib/compress/zstd_compress.c
lib/zstd.h
tests/fuzzer.c

index f2b9e03edc4f126a497d4ff6a01aa3243264e4d0..4a9f6b7c892a0f6fb69458f34f6cf75ae1e9eafc 100644 (file)
@@ -4179,18 +4179,28 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
     /* compression stage */
 #ifdef ZSTD_MULTITHREAD
     if (cctx->appliedParams.nbWorkers > 0) {
+        int const forceMaxProgress = (endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
+        size_t flushMin;
+        assert(forceMaxProgress || endOp == ZSTD_e_continue /* Protection for a new flush type */);
         if (cctx->cParamsChanged) {
             ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams);
             cctx->cParamsChanged = 0;
         }
-        {   size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
+        do {
+            flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
             if ( ZSTD_isError(flushMin)
               || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
                 ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only);
             }
-            DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
-            return flushMin;
-    }   }
+            FORWARD_IF_ERROR(flushMin);
+        } while (forceMaxProgress && flushMin != 0 && output->pos < output->size);
+        DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
+        /* Either we don't require maximum forward progress, we've finished the
+         * flush, or we are out of output space.
+         */
+        assert(!forceMaxProgress || flushMin == 0 || output->pos == output->size);
+        return flushMin;
+    }
 #endif
     FORWARD_IF_ERROR( ZSTD_compressStream_generic(cctx, output, input, endOp) );
     DEBUGLOG(5, "completed ZSTD_compressStream2");
index dc6348659bdaa960a1c5f2854e97518bb494f593..0c9ebe5b61fd05536b6d56b24082bf1db09ce994 100644 (file)
@@ -577,6 +577,11 @@ typedef struct ZSTD_outBuffer_s {
 *  The caller must check if input has been entirely consumed.
 *  If not, the caller must make some room to receive more compressed data,
 *  and then present again remaining input data.
+*  note: ZSTD_e_continue is guaranteed to make some forward progress when called,
+*        but doesn't guarantee maximal forward progress. This is especially relevant
+*        when compressing with multiple threads. The call won't block if it can
+*        consume some input, but if it can't it will wait for some, but not all,
+*        output to be flushed.
 * @return : provides a minimum amount of data remaining to be flushed from internal buffers
 *           or an error code, which can be tested using ZSTD_isError().
 *
@@ -586,6 +591,8 @@ typedef struct ZSTD_outBuffer_s {
 *  In which case, make some room to receive more compressed data, and call again ZSTD_compressStream2() with ZSTD_e_flush.
 *  You must continue calling ZSTD_compressStream2() with ZSTD_e_flush until it returns 0, at which point you can change the
 *  operation.
+*  note: ZSTD_e_flush will flush as much output as possible, meaning when compressing with multiple threads, it will
+*        block until the flush is complete or the output buffer is full.
 *  @return : 0 if internal buffers are entirely flushed,
 *            >0 if some data still present within internal buffer (the value is minimal estimation of remaining size),
 *            or an error code, which can be tested using ZSTD_isError().
@@ -596,6 +603,8 @@ typedef struct ZSTD_outBuffer_s {
 *  flush operation is the same, and follows same rules as calling ZSTD_compressStream2() with ZSTD_e_flush.
 *  You must continue calling ZSTD_compressStream2() with ZSTD_e_end until it returns 0, at which point you are free to
 *  start a new frame.
+*  note: ZSTD_e_end will flush as much output as possible, meaning when compressing with multiple threads, it will
+*        block until the flush is complete or the output buffer is full.
 *  @return : 0 if frame fully completed and fully flushed,
 *            >0 if some data still present within internal buffer (the value is minimal estimation of remaining size),
 *            or an error code, which can be tested using ZSTD_isError().
@@ -613,11 +622,13 @@ typedef enum {
     ZSTD_e_continue=0, /* collect more data, encoder decides when to output compressed result, for optimal compression ratio */
     ZSTD_e_flush=1,    /* flush any data provided so far,
                         * it creates (at least) one new block, that can be decoded immediately on reception;
-                        * frame will continue: any future data can still reference previously compressed data, improving compression. */
+                        * frame will continue: any future data can still reference previously compressed data, improving compression.
+                        * note : multithreaded compression will block to flush as much output as possible. */
     ZSTD_e_end=2       /* flush any remaining data _and_ close current frame.
                         * note that frame is only closed after compressed data is fully flushed (return value == 0).
                         * After that point, any additional data starts a new frame.
-                        * note : each frame is independent (does not reference any content from previous frame). */
+                        * note : each frame is independent (does not reference any content from previous frame).
+                        : note : multithreaded compression will block to flush as much output as possible. */
 } ZSTD_EndDirective;
 
 /*! ZSTD_compressStream2() :
index 7bc2f10cbb2dcfe238a520abc7ef5f4f56c8b612..cfb07eb1042042f58500dcfb91f5336d11014c8b 100644 (file)
@@ -880,6 +880,19 @@ static int basicUnitTests(U32 seed, double compressibility)
     }
     DISPLAYLEVEL(3, "OK \n");
 
+    DISPLAYLEVEL(3, "test%3i : Multithreaded ZSTD_compress2() with rsyncable : ", testNb++)
+    {   ZSTD_CCtx* cctx = ZSTD_createCCtx();
+        /* Set rsyncable and don't give the ZSTD_compressBound(CNBuffSize) so
+         * ZSTDMT is forced to not take the shortcut.
+         */
+        CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_c_compressionLevel, 1) );
+        CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_c_nbWorkers, 1) );
+        CHECK( ZSTD_CCtx_setParameter(cctx, ZSTD_c_rsyncable, 1) );
+        CHECK( ZSTD_compress2(cctx, compressedBuffer, compressedBufferSize - 1, CNBuffer, CNBuffSize) );
+        ZSTD_freeCCtx(cctx);
+    }
+    DISPLAYLEVEL(3, "OK \n");
+
     DISPLAYLEVEL(3, "test%3i : setting multithreaded parameters : ", testNb++)
     {   ZSTD_CCtx_params* params = ZSTD_createCCtxParams();
         int value;