]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
[lib] Ensure that multithreaded compression always makes some progress 2414/head
authorNick Terrell <terrelln@fb.com>
Fri, 4 Dec 2020 04:25:14 +0000 (20:25 -0800)
committerNick Terrell <terrelln@fb.com>
Fri, 4 Dec 2020 04:25:14 +0000 (20:25 -0800)
lib/compress/zstd_compress.c
tests/zstreamtest.c

index 6830020adb8eeaba20c39c5ad70881071069d6ac..ed142b2c816ccc1f028b0b27d9dff49d646fdc4c 100644 (file)
@@ -4441,26 +4441,42 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx,
     /* compression stage */
 #ifdef ZSTD_MULTITHREAD
     if (cctx->appliedParams.nbWorkers > 0) {
-        int const forceMaxProgress = (endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
         size_t flushMin;
-        assert(forceMaxProgress || endOp == ZSTD_e_continue /* Protection for a new flush type */);
         if (cctx->cParamsChanged) {
             ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, &cctx->requestedParams);
             cctx->cParamsChanged = 0;
         }
-        do {
+        for (;;) {
+            size_t const ipos = input->pos;
+            size_t const opos = output->pos;
             flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
             if ( ZSTD_isError(flushMin)
               || (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
                 ZSTD_CCtx_reset(cctx, ZSTD_reset_session_only);
             }
             FORWARD_IF_ERROR(flushMin, "ZSTDMT_compressStream_generic failed");
-        } while (forceMaxProgress && flushMin != 0 && output->pos < output->size);
+
+            if (endOp == ZSTD_e_continue) {
+                /* We only require some progress with ZSTD_e_continue, not maximal progress.
+                 * We're done if we've consumed or produced any bytes, or either buffer is
+                 * full.
+                 */
+                if (input->pos != ipos || output->pos != opos || input->pos == input->size || output->pos == output->size)
+                    break;
+            } else {
+                assert(endOp == ZSTD_e_flush || endOp == ZSTD_e_end);
+                /* We require maximal progress. We're done when the flush is complete or the
+                 * output buffer is full.
+                 */
+                if (flushMin == 0 || output->pos == output->size)
+                    break;
+            }
+        }
         DEBUGLOG(5, "completed ZSTD_compressStream2 delegating to ZSTDMT_compressStream_generic");
         /* Either we don't require maximum forward progress, we've finished the
          * flush, or we are out of output space.
          */
-        assert(!forceMaxProgress || flushMin == 0 || output->pos == output->size);
+        assert(endOp == ZSTD_e_continue || flushMin == 0 || output->pos == output->size);
         ZSTD_setBufferExpectations(cctx, output, input);
         return flushMin;
     }
index 1855b4dec3c5ac82cbb00295c0e2b26fc8f464f8..fa18ea4b47263c362fef992ead22bf82311af671 100644 (file)
@@ -2299,6 +2299,7 @@ static int fuzzerTests_newAPI(U32 seed, int nbTests, int startTest,
                             /* Ensure maximal forward progress for determinism */
                             forwardProgress = (inBuff.pos != ipos) || (outBuff.pos != opos);
                         } while (forwardProgress);
+                        assert(inBuff.pos == inBuff.size);
 
                         XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
                         memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);